-
Notifications
You must be signed in to change notification settings - Fork 444
Fallback for upsert when arrow cannot compare source rows with target rows #1878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
f16f8b3
06af05a
d8f71b5
6b9ddf4
7131dc0
0719ecf
54537c2
a699602
8e32e9c
a088d6c
e9af368
4e75ce1
79f6181
6f50b48
e9e9485
85424d8
7da6cca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,7 @@ | |
| from pyiceberg.table import UpsertResult | ||
| from pyiceberg.table.snapshots import Operation | ||
| from pyiceberg.table.upsert_util import create_match_filter | ||
| from pyiceberg.types import IntegerType, NestedField, StringType | ||
| from pyiceberg.types import IntegerType, NestedField, StringType, StructType | ||
| from tests.catalog.test_base import InMemoryCatalog, Table | ||
|
|
||
|
|
||
|
|
@@ -511,6 +511,96 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None: | |
| tbl.upsert(df) | ||
|
|
||
|
|
||
| def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: | ||
|
||
| identifier = "default.test_upsert_struct_field_fails" | ||
| _drop_table(catalog, identifier) | ||
|
|
||
| schema = Schema( | ||
| NestedField(1, "id", IntegerType(), required=True), | ||
| NestedField( | ||
| 2, | ||
| "nested_type", | ||
| StructType( | ||
| NestedField(3, "sub1", StringType(), required=True), | ||
| NestedField(4, "sub2", StringType(), required=True), | ||
| ), | ||
| required=False, | ||
| ), | ||
| identifier_field_ids=[1], | ||
| ) | ||
|
|
||
| tbl = catalog.create_table(identifier, schema=schema) | ||
|
|
||
| arrow_schema = pa.schema( | ||
| [ | ||
| pa.field("id", pa.int32(), nullable=False), | ||
| pa.field( | ||
| "nested_type", | ||
| pa.struct( | ||
| [ | ||
| pa.field("sub1", pa.large_string(), nullable=False), | ||
| pa.field("sub2", pa.large_string(), nullable=False), | ||
| ] | ||
| ), | ||
| nullable=True, | ||
| ), | ||
| ] | ||
| ) | ||
|
|
||
| initial_data = pa.Table.from_pylist( | ||
| [ | ||
| { | ||
| "id": 1, | ||
| "nested_type": {"sub1": "bla1", "sub2": "bla"}, | ||
| } | ||
| ], | ||
| schema=arrow_schema, | ||
| ) | ||
| tbl.append(initial_data) | ||
|
|
||
| update_data = pa.Table.from_pylist( | ||
| [ | ||
| { | ||
| "id": 2, | ||
| "nested_type": {"sub1": "bla1", "sub2": "bla"}, | ||
| }, | ||
| { | ||
| "id": 1, | ||
| "nested_type": {"sub1": "bla1", "sub2": "bla2"}, | ||
| }, | ||
| ], | ||
| schema=arrow_schema, | ||
| ) | ||
|
|
||
| res = tbl.upsert(update_data, join_cols=["id"]) | ||
|
|
||
| expected_updated = 1 | ||
| expected_inserted = 1 | ||
|
|
||
| assert_upsert_result(res, expected_updated, expected_inserted) | ||
|
|
||
| update_data = pa.Table.from_pylist( | ||
| [ | ||
| { | ||
| "id": 2, | ||
| "nested_type": {"sub1": "bla1", "sub2": "bla"}, | ||
| }, | ||
| { | ||
| "id": 1, | ||
| "nested_type": {"sub1": "bla1", "sub2": "bla2"}, | ||
| }, | ||
| ], | ||
| schema=arrow_schema, | ||
| ) | ||
|
|
||
| res = tbl.upsert(update_data, join_cols=["id"]) | ||
|
|
||
| expected_updated = 0 | ||
| expected_inserted = 0 | ||
|
|
||
| assert_upsert_result(res, expected_updated, expected_inserted) | ||
|
|
||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def test_upsert_with_nulls(catalog: Catalog) -> None: | ||
| identifier = "default.test_upsert_with_nulls" | ||
| _drop_table(catalog, identifier) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be an inner join, since we don't support null-keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, good catch! We can do inner join and remove the
matching_indices = joined.filter(pc.field(MARKER_COLUMN_NAME))step.