From 1b29d0699d5dae947caceb86ea11b1703594adfb Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 20 Jan 2025 12:51:31 +0100 Subject: [PATCH 1/8] PyArrow: Avoid buffer-overflow by avoid doing a sort This was already being discussed back here: https://github.com/apache/iceberg-python/issues/208#issuecomment-1889891973 This PR changes from doing a sort, and then a single pass over the table to the the approach where we determine the unique partition tuples then filter on them one by one. Fixes https://github.com/apache/iceberg-python/issues/1491 Because the sort caused buffers to be joined where it would overflow in Arrow. I think this is an issue on the Arrow side, and it should automatically break up into smaller buffers. The `combine_chunks` method does this correctly. Now: ``` 0.42877754200890195 Run 1 took: 0.2507691659993725 Run 2 took: 0.24833179199777078 Run 3 took: 0.24401691700040828 Run 4 took: 0.2419595829996979 Average runtime of 0.28 seconds ``` Before: ``` Run 0 took: 1.0768639159941813 Run 1 took: 0.8784021250030492 Run 2 took: 0.8486490420036716 Run 3 took: 0.8614017910003895 Run 4 took: 0.8497851670108503 Average runtime of 0.9 seconds ``` So it comes with a nice speedup as well :) --- pyiceberg/io/pyarrow.py | 108 +++++++----------- pyiceberg/partitioning.py | 10 +- tests/benchmark/test_benchmark.py | 72 ++++++++++++ .../test_writes/test_partitioned_writes.py | 23 ++++ 4 files changed, 145 insertions(+), 68 deletions(-) create mode 100644 tests/benchmark/test_benchmark.py diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d288e4f2f1..e4d9896096 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -27,8 +27,10 @@ import concurrent.futures import fnmatch +import functools import itertools import logging +import operator import os import re import uuid @@ -2542,36 +2544,6 @@ class _TablePartition: arrow_table_partition: pa.Table -def _get_table_partitions( - arrow_table: pa.Table, - partition_spec: PartitionSpec, - schema: Schema, - slice_instructions: list[dict[str, Any]], -) -> list[_TablePartition]: - sorted_slice_instructions = sorted(slice_instructions, key=lambda x: x["offset"]) - - partition_fields = partition_spec.fields - - offsets = [inst["offset"] for inst in sorted_slice_instructions] - projected_and_filtered = { - partition_field.source_id: arrow_table[schema.find_field(name_or_id=partition_field.source_id).name] - .take(offsets) - .to_pylist() - for partition_field in partition_fields - } - - table_partitions = [] - for idx, inst in enumerate(sorted_slice_instructions): - partition_slice = arrow_table.slice(**inst) - fieldvalues = [ - PartitionFieldValue(partition_field, projected_and_filtered[partition_field.source_id][idx]) - for partition_field in partition_fields - ] - partition_key = PartitionKey(raw_partition_field_values=fieldvalues, partition_spec=partition_spec, schema=schema) - table_partitions.append(_TablePartition(partition_key=partition_key, arrow_table_partition=partition_slice)) - return table_partitions - - def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]: """Based on the iceberg table partition spec, slice the arrow table into partitions with their keys. @@ -2594,42 +2566,46 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T We then retrieve the partition keys by offsets. And slice the arrow table by offsets and lengths of each partition. """ - partition_columns: List[Tuple[PartitionField, NestedField]] = [ - (partition_field, schema.find_field(partition_field.source_id)) for partition_field in spec.fields - ] - partition_values_table = pa.table( - { - str(partition.field_id): partition.transform.pyarrow_transform(field.field_type)(arrow_table[field.name]) - for partition, field in partition_columns - } - ) + # Assign unique names to columns where the partition transform has been applied + # to avoid conflicts + partition_fields = [f"_partition_{field.name}" for field in spec.fields] + + for partition, name in zip(spec.fields, partition_fields): + source_field = schema.find_field(partition.source_id) + arrow_table = arrow_table.append_column( + name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name]) + ) + + unique_partition_fields = arrow_table.select(partition_fields).group_by(partition_fields).aggregate([]) + + table_partitions = [] + # TODO: As a next step, we could also play around with yielding instead of materializing the full list + for unique_partition in unique_partition_fields.to_pylist(): + partition_key = PartitionKey( + raw_partition_field_values=[ + PartitionFieldValue(field=field, value=unique_partition[name]) + for field, name in zip(spec.fields, partition_fields) + ], + partition_spec=spec, + schema=schema, + ) + filtered_table = arrow_table.filter( + functools.reduce( + operator.and_, + [ + pc.field(partition_field_name) == unique_partition[partition_field_name] + if unique_partition[partition_field_name] is not None + else pc.field(partition_field_name).is_null() + for field, partition_field_name in zip(spec.fields, partition_fields) + ], + ) + ) + filtered_table = filtered_table.drop_columns(partition_fields) - # Sort by partitions - sort_indices = pa.compute.sort_indices( - partition_values_table, - sort_keys=[(col, "ascending") for col in partition_values_table.column_names], - null_placement="at_end", - ).to_pylist() - arrow_table = arrow_table.take(sort_indices) - - # Get slice_instructions to group by partitions - partition_values_table = partition_values_table.take(sort_indices) - reversed_indices = pa.compute.sort_indices( - partition_values_table, - sort_keys=[(col, "descending") for col in partition_values_table.column_names], - null_placement="at_start", - ).to_pylist() - slice_instructions: List[Dict[str, Any]] = [] - last = len(reversed_indices) - reversed_indices_size = len(reversed_indices) - ptr = 0 - while ptr < reversed_indices_size: - group_size = last - reversed_indices[ptr] - offset = reversed_indices[ptr] - slice_instructions.append({"offset": offset, "length": group_size}) - last = reversed_indices[ptr] - ptr = ptr + group_size - - table_partitions: List[_TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) + # The combine_chunks seems to be counter-intuitive to do, but it actually returns + # fresh buffers that don't interfere with each other when it is written out to file + table_partitions.append( + _TablePartition(partition_key=partition_key, arrow_table_partition=filtered_table.combine_chunks()) + ) return table_partitions diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 1813772217..b3ab763bbd 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -29,6 +29,7 @@ Optional, Tuple, TypeVar, + Union, ) from urllib.parse import quote_plus @@ -425,8 +426,13 @@ def _to_partition_representation(type: IcebergType, value: Any) -> Any: @_to_partition_representation.register(TimestampType) @_to_partition_representation.register(TimestamptzType) -def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: - return datetime_to_micros(value) if value is not None else None +def _(type: IcebergType, value: Optional[Union[datetime, int]]) -> Optional[int]: + if value is None: + return None + elif isinstance(value, int): + return value + else: + return datetime_to_micros(value) @_to_partition_representation.register(DateType) diff --git a/tests/benchmark/test_benchmark.py b/tests/benchmark/test_benchmark.py new file mode 100644 index 0000000000..7bb34ef7c1 --- /dev/null +++ b/tests/benchmark/test_benchmark.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import statistics +import timeit +import urllib + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.transforms import DayTransform + + +@pytest.fixture(scope="session") +def taxi_dataset(tmp_path_factory: pytest.TempPathFactory) -> pa.Table: + """Reads the Taxi dataset to disk""" + taxi_dataset = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet" + taxi_dataset_dest = tmp_path_factory.mktemp("taxi_dataset") / "yellow_tripdata_2022-01.parquet" + urllib.request.urlretrieve(taxi_dataset, taxi_dataset_dest) + + return pq.read_table(taxi_dataset_dest) + + +@pytest.mark.benchmark +def test_partitioned_write(tmp_path_factory: pytest.TempPathFactory, taxi_dataset: pa.Table) -> None: + """Tests writing to a partitioned table with something that would be close a production-like situation""" + from pyiceberg.catalog.sql import SqlCatalog + + warehouse_path = str(tmp_path_factory.mktemp("warehouse")) + catalog = SqlCatalog( + "default", + uri=f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", + warehouse=f"file://{warehouse_path}", + ) + + catalog.create_namespace("default") + + tbl = catalog.create_table("default.taxi_partitioned", schema=taxi_dataset.schema) + + with tbl.update_spec() as spec: + spec.add_field("tpep_pickup_datetime", DayTransform()) + + # Profiling can sometimes be handy as well + # with cProfile.Profile() as pr: + # tbl.append(taxi_dataset) + # + # pr.print_stats(sort=True) + + runs = [] + for run in range(5): + start_time = timeit.default_timer() + tbl.append(taxi_dataset) + elapsed = timeit.default_timer() - start_time + + print(f"Run {run} took: {elapsed}") + runs.append(elapsed) + + print(f"Average runtime of {round(statistics.mean(runs), 2)} seconds") diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 1e6ea1b797..807a504afd 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -17,6 +17,7 @@ # pylint:disable=redefined-outer-name +import random from datetime import date from typing import Any, Set @@ -1126,3 +1127,25 @@ def test_append_multiple_partitions( """ ) assert files_df.count() == 6 + + +@pytest.mark.integration +def test_pyarrow_overflow(session_catalog: Catalog) -> None: + """Test what happens when the offset is beyond 32 bits""" + identifier = "default.arrow_table_overflow" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + x = pa.array([random.randint(0, 999) for _ in range(30_000)]) + ta = pa.chunked_array([x] * 10_000) + y = ["fixed_string"] * 30_000 + tb = pa.chunked_array([y] * 10_000) + # Create pa.table + arrow_table = pa.table({"a": ta, "b": tb}) + + table = session_catalog.create_table(identifier, arrow_table.schema) + with table.update_spec() as update_spec: + update_spec.add_field("b", IdentityTransform(), "pb") + table.append(arrow_table) From cafd39dd6cd9389d138fdae58341cc8a47dee501 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 20 Jan 2025 20:00:43 +0100 Subject: [PATCH 2/8] Thanks Kevin! --- pyiceberg/io/pyarrow.py | 16 ++++------------ pyiceberg/partitioning.py | 13 +++++-------- pyproject.toml | 1 + 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e4d9896096..172d21c375 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2545,7 +2545,7 @@ class _TablePartition: def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]: - """Based on the iceberg table partition spec, slice the arrow table into partitions with their keys. + """Based on the iceberg table partition spec, filter the arrow table into partitions with their keys. Example: Input: @@ -2554,17 +2554,9 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100], 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}. The algorithm: - Firstly we group the rows into partitions by sorting with sort order [('n_legs', 'descending'), ('year', 'descending')] - and null_placement of "at_end". - This gives the same table as raw input. - Then we sort_indices using reverse order of [('n_legs', 'descending'), ('year', 'descending')] - and null_placement : "at_start". - This gives: - [8, 7, 4, 5, 6, 3, 1, 2, 0] - Based on this we get partition groups of indices: - [{'offset': 8, 'length': 1}, {'offset': 7, 'length': 1}, {'offset': 4, 'length': 3}, {'offset': 3, 'length': 1}, {'offset': 1, 'length': 2}, {'offset': 0, 'length': 1}] - We then retrieve the partition keys by offsets. - And slice the arrow table by offsets and lengths of each partition. + - We determine the set of unique partition keys + - Then we produce a set of partitions by filtering on each of the combinations + - We combine the chunks to create a copy to avoid GIL congestion on the original table """ # Assign unique names to columns where the partition transform has been applied # to avoid conflicts diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index b3ab763bbd..6ca725b12c 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -414,7 +414,9 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema: the final partition record value. """ iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type - iceberg_typed_value = _to_partition_representation(iceberg_type, value) + if not isinstance(value, int): + # When adding files, it can be that we still need to convert from logical types to physical types + iceberg_typed_value = _to_partition_representation(iceberg_type, value) transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) return transformed_value @@ -426,13 +428,8 @@ def _to_partition_representation(type: IcebergType, value: Any) -> Any: @_to_partition_representation.register(TimestampType) @_to_partition_representation.register(TimestamptzType) -def _(type: IcebergType, value: Optional[Union[datetime, int]]) -> Optional[int]: - if value is None: - return None - elif isinstance(value, int): - return value - else: - return datetime_to_micros(value) +def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: + return datetime_to_micros(value) if value is not None else None @_to_partition_representation.register(DateType) diff --git a/pyproject.toml b/pyproject.toml index dcdb5e7156..c71818e7ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1220,6 +1220,7 @@ markers = [ "adls: marks a test as requiring access to adls compliant storage (use with --adls.account-name, --adls.account-key, and --adls.endpoint args)", "integration: marks integration tests against Apache Spark", "gcs: marks a test as requiring access to gcs compliant storage (use with --gs.token, --gs.project, and --gs.endpoint)", + "benchmark: collection of tests to validate read/write performance before and after a change" ] # Turns a warning into an error From 7442c4181b11cc3de2698c462a765dc0c1e8c7e5 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 21 Jan 2025 16:49:14 +0100 Subject: [PATCH 3/8] lint --- pyiceberg/partitioning.py | 5 ++--- tests/integration/test_writes/test_partitioned_writes.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 6ca725b12c..574cd118dc 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -29,7 +29,6 @@ Optional, Tuple, TypeVar, - Union, ) from urllib.parse import quote_plus @@ -416,8 +415,8 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema: iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type if not isinstance(value, int): # When adding files, it can be that we still need to convert from logical types to physical types - iceberg_typed_value = _to_partition_representation(iceberg_type, value) - transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) + value = _to_partition_representation(iceberg_type, value) + transformed_value = partition_field.transform.transform(iceberg_type)(value) return transformed_value diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 807a504afd..e1fafa2531 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -17,7 +17,6 @@ # pylint:disable=redefined-outer-name -import random from datetime import date from typing import Any, Set @@ -1138,7 +1137,7 @@ def test_pyarrow_overflow(session_catalog: Catalog) -> None: except NoSuchTableError: pass - x = pa.array([random.randint(0, 999) for _ in range(30_000)]) + x = pa.array([1925 for _ in range(30_000)]) ta = pa.chunked_array([x] * 10_000) y = ["fixed_string"] * 30_000 tb = pa.chunked_array([y] * 10_000) From 3c590cbabb7899bd589331cb794199c2169e2909 Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 21 Jan 2025 21:04:49 +0100 Subject: [PATCH 4/8] Remove int column --- tests/integration/test_writes/test_partitioned_writes.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index e1fafa2531..3f51475ee1 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -1137,12 +1137,10 @@ def test_pyarrow_overflow(session_catalog: Catalog) -> None: except NoSuchTableError: pass - x = pa.array([1925 for _ in range(30_000)]) - ta = pa.chunked_array([x] * 10_000) - y = ["fixed_string"] * 30_000 - tb = pa.chunked_array([y] * 10_000) + arr = ["fixed_string"] * 30_000 + strings = pa.chunked_array([arr] * 10_000) # Create pa.table - arrow_table = pa.table({"a": ta, "b": tb}) + arrow_table = pa.table({"a": strings}) table = session_catalog.create_table(identifier, arrow_table.schema) with table.update_spec() as update_spec: From 93a6522bd48b307aae603de5e214605a30c1f0aa Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 21 Jan 2025 13:54:48 -0500 Subject: [PATCH 5/8] Update NOTICE copyright to 2025 (#1557) --- NOTICE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE b/NOTICE index adcae2d516..948656aadd 100644 --- a/NOTICE +++ b/NOTICE @@ -1,6 +1,6 @@ Apache Iceberg -Copyright 2017-2024 The Apache Software Foundation +Copyright 2017-2025 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). From 23dd64e724122930e680ab33029fabe9bc8303e7 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 22 Jan 2025 16:29:54 +0100 Subject: [PATCH 6/8] WIP --- pyiceberg/partitioning.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 574cd118dc..e070685b58 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -29,6 +29,7 @@ Optional, Tuple, TypeVar, + Union, ) from urllib.parse import quote_plus @@ -413,9 +414,7 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema: the final partition record value. """ iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type - if not isinstance(value, int): - # When adding files, it can be that we still need to convert from logical types to physical types - value = _to_partition_representation(iceberg_type, value) + value = _to_partition_representation(iceberg_type, value) transformed_value = partition_field.transform.transform(iceberg_type)(value) return transformed_value @@ -427,8 +426,15 @@ def _to_partition_representation(type: IcebergType, value: Any) -> Any: @_to_partition_representation.register(TimestampType) @_to_partition_representation.register(TimestamptzType) -def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]: - return datetime_to_micros(value) if value is not None else None +def _(type: IcebergType, value: Optional[Union[int, datetime]]) -> Optional[int]: + if value is None: + return None + elif isinstance(value, int): + return value + elif isinstance(value, datetime): + return datetime_to_micros(value) + else: + raise ValueError(f"Unknown type: {value}") @_to_partition_representation.register(DateType) From ad338268518118d79d6af5dbe87a13433cc6ad55 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 22 Jan 2025 22:48:40 +0100 Subject: [PATCH 7/8] WIP --- pyiceberg/io/pyarrow.py | 7 +- pyiceberg/partitioning.py | 27 +- pyiceberg/table/__init__.py | 6 +- tests/integration/test_partitioning_key.py | 1299 ++++++++--------- .../test_writes/test_partitioned_writes.py | 20 - tests/table/test_locations.py | 2 +- 6 files changed, 677 insertions(+), 684 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4fae6cb86c..391562e67b 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2176,7 +2176,10 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A raise ValueError( f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}" ) - return lower_value + + source_field = schema.find_field(partition_field.source_id) + transform = partition_field.transform.transform(source_field.field_type) + return transform(lower_value) def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) @@ -2590,7 +2593,7 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T # TODO: As a next step, we could also play around with yielding instead of materializing the full list for unique_partition in unique_partition_fields.to_pylist(): partition_key = PartitionKey( - raw_partition_field_values=[ + field_values=[ PartitionFieldValue(field=field, value=unique_partition[name]) for field, name in zip(spec.fields, partition_fields) ], diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 6bba782f36..01606a3414 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -394,14 +394,14 @@ class PartitionFieldValue: @dataclass(frozen=True) class PartitionKey: - raw_partition_field_values: List[PartitionFieldValue] + field_values: List[PartitionFieldValue] partition_spec: PartitionSpec schema: Schema @cached_property def partition(self) -> Record: # partition key transformed with iceberg internal representation as input iceberg_typed_key_values = {} - for raw_partition_field_value in self.raw_partition_field_values: + for raw_partition_field_value in self.field_values: partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] if len(partition_fields) != 1: raise ValueError(f"Cannot have redundant partitions: {partition_fields}") @@ -428,13 +428,19 @@ def partition_record_value(partition_field: PartitionField, value: Any, schema: the final partition record value. """ iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type - value = _to_partition_representation(iceberg_type, value) - transformed_value = partition_field.transform.transform(iceberg_type)(value) - return transformed_value + return _to_partition_representation(iceberg_type, value) @singledispatch def _to_partition_representation(type: IcebergType, value: Any) -> Any: + """Strip the logical type into the physical type. + + It can be that the value is already transformed into its physical type, + in this case it will return the original value. Keep in mind that the + bucket transform always will return an int, but an identity transform + can return date that still needs to be transformed into an int (days + since epoch). + """ return TypeError(f"Unsupported partition field type: {type}") @@ -452,8 +458,15 @@ def _(type: IcebergType, value: Optional[Union[int, datetime]]) -> Optional[int] @_to_partition_representation.register(DateType) -def _(type: IcebergType, value: Optional[date]) -> Optional[int]: - return date_to_days(value) if value is not None else None +def _(type: IcebergType, value: Optional[Union[int, date]]) -> Optional[int]: + if value is None: + return None + elif isinstance(value, int): + return value + elif isinstance(value, date): + return date_to_days(value) + else: + raise ValueError(f"Unknown type: {value}") @_to_partition_representation.register(TimeType) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 057c02f260..5e13ab85cf 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -453,8 +453,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) with self._append_snapshot_producer(snapshot_properties) as append_files: # skip writing data files if the dataframe is empty if df.shape[0] > 0: - data_files = _dataframe_to_data_files( - table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io + data_files = list( + _dataframe_to_data_files( + table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io + ) ) for data_file in data_files: append_files.append_data_file(data_file) diff --git a/tests/integration/test_partitioning_key.py b/tests/integration/test_partitioning_key.py index 3955259d33..04d6f6d25e 100644 --- a/tests/integration/test_partitioning_key.py +++ b/tests/integration/test_partitioning_key.py @@ -15,9 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -import uuid -from datetime import date, datetime, timedelta, timezone -from decimal import Decimal +from datetime import datetime from typing import Any, List import pytest @@ -28,13 +26,7 @@ from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec from pyiceberg.schema import Schema, make_compatible_name from pyiceberg.transforms import ( - BucketTransform, - DayTransform, - HourTransform, - IdentityTransform, MonthTransform, - TruncateTransform, - YearTransform, ) from pyiceberg.typedef import Record from pyiceberg.types import ( @@ -80,291 +72,291 @@ @pytest.mark.parametrize( "partition_fields, partition_values, expected_partition_record, expected_hive_partition_path_slice, spark_create_table_sql_for_justification, spark_data_insert_sql_for_justification", [ - # # Identity Transform - ( - [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], - [False], - Record(boolean_field=False), - "boolean_field=false", - f"""CREATE TABLE {identifier} ( - boolean_field boolean, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(boolean_field) -- Partitioning by 'boolean_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (false, 'Boolean field set to false'); - """, - ), - ( - [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], - ["sample_string"], - Record(string_field="sample_string"), - "string_field=sample_string", - f"""CREATE TABLE {identifier} ( - string_field string, - another_string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(string_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('sample_string', 'Another string value') - """, - ), - ( - [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], - [42], - Record(int_field=42), - "int_field=42", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(int_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (42, 'Associated string value for int 42') - """, - ), - ( - [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], - [1234567890123456789], - Record(long_field=1234567890123456789), - "long_field=1234567890123456789", - f"""CREATE TABLE {identifier} ( - long_field bigint, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(long_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (1234567890123456789, 'Associated string value for long 1234567890123456789') - """, - ), - ( - [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], - [3.14], - Record(float_field=3.14), - "float_field=3.14", - # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) - # so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # float_field float, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(float_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (3.14, 'Associated string value for float 3.14') - # """ - ), - ( - [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], - [6.282], - Record(double_field=6.282), - "double_field=6.282", - # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) - # so justification (compare expected value with spark behavior) would fail. - None, - None, - # f"""CREATE TABLE {identifier} ( - # double_field double, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(double_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (6.282, 'Associated string value for double 6.282') - # """ - ), - ( - [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], - [datetime(2023, 1, 1, 12, 0, 1, 999)], - Record(timestamp_field=1672574401000999), - "timestamp_field=2023-01-01T12%3A00%3A01.000999", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp_ntz, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(timestamp_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') - """, - ), - ( - [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], - [datetime(2023, 1, 1, 12, 0, 1)], - Record(timestamp_field=1672574401000000), - "timestamp_field=2023-01-01T12%3A00%3A01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp_ntz, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(timestamp_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') - """, - ), - ( - [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], - [datetime(2023, 1, 1, 12, 0, 0)], - Record(timestamp_field=1672574400000000), - "timestamp_field=2023-01-01T12%3A00%3A00", - # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail - # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' - # TLDR: CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ) becomes 2023-01-01T12:00 in the hive partition path when spark writes it (without the seconds). - None, - None, - # f"""CREATE TABLE {identifier} ( - # timestamp_field timestamp_ntz, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(timestamp_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') - # """ - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field=1672563601000999), - "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", - # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail - # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' - # TLDR: CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP) becomes 2023-01-01T09:00:01.000999Z in the hive partition path when spark writes it (while iceberg: timestamptz_field=2023-01-01T09:00:01.000999+00:00). - None, - None, - # f"""CREATE TABLE {identifier} ( - # timestamptz_field timestamp, - # string_field string - # ) - # USING iceberg - # PARTITIONED BY ( - # identity(timestamptz_field) - # ) - # """, - # f"""INSERT INTO {identifier} - # VALUES - # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') - # """ - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], - [date(2023, 1, 1)], - Record(date_field=19358), - "date_field=2023-01-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(date_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') - """, - ), - ( - [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], - [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], - Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), - "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", - f"""CREATE TABLE {identifier} ( - uuid_field string, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(uuid_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') - """, - ), - ( - [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], - [b"example"], - Record(binary_field=b"example"), - "binary_field=ZXhhbXBsZQ%3D%3D", - f"""CREATE TABLE {identifier} ( - binary_field binary, - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(binary_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('example' AS BINARY), 'Associated string value for binary `example`') - """, - ), - ( - [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], - [Decimal("123.45")], - Record(decimal_field=Decimal("123.45")), - "decimal_field=123.45", - f"""CREATE TABLE {identifier} ( - decimal_field decimal(5,2), - string_field string - ) - USING iceberg - PARTITIONED BY ( - identity(decimal_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (123.45, 'Associated string value for decimal 123.45') - """, - ), + # # # Identity Transform + # ( + # [PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="boolean_field")], + # [False], + # Record(boolean_field=False), + # "boolean_field=false", + # f"""CREATE TABLE {identifier} ( + # boolean_field boolean, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(boolean_field) -- Partitioning by 'boolean_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (false, 'Boolean field set to false'); + # """, + # ), + # ( + # [PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="string_field")], + # ["sample_string"], + # Record(string_field="sample_string"), + # "string_field=sample_string", + # f"""CREATE TABLE {identifier} ( + # string_field string, + # another_string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(string_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('sample_string', 'Another string value') + # """, + # ), + # ( + # [PartitionField(source_id=4, field_id=1001, transform=IdentityTransform(), name="int_field")], + # [42], + # Record(int_field=42), + # "int_field=42", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(int_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (42, 'Associated string value for int 42') + # """, + # ), + # ( + # [PartitionField(source_id=5, field_id=1001, transform=IdentityTransform(), name="long_field")], + # [1234567890123456789], + # Record(long_field=1234567890123456789), + # "long_field=1234567890123456789", + # f"""CREATE TABLE {identifier} ( + # long_field bigint, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(long_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (1234567890123456789, 'Associated string value for long 1234567890123456789') + # """, + # ), + # ( + # [PartitionField(source_id=6, field_id=1001, transform=IdentityTransform(), name="float_field")], + # [3.14], + # Record(float_field=3.14), + # "float_field=3.14", + # # spark writes differently as pyiceberg, Record[float_field=3.140000104904175], path:float_field=3.14 (Record has difference) + # # so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # float_field float, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(float_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (3.14, 'Associated string value for float 3.14') + # # """ + # ), + # ( + # [PartitionField(source_id=7, field_id=1001, transform=IdentityTransform(), name="double_field")], + # [6.282], + # Record(double_field=6.282), + # "double_field=6.282", + # # spark writes differently as pyiceberg, Record[double_field=6.2820000648498535] path:double_field=6.282 (Record has difference) + # # so justification (compare expected value with spark behavior) would fail. + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # double_field double, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(double_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (6.282, 'Associated string value for double 6.282') + # # """ + # ), + # ( + # [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + # [datetime(2023, 1, 1, 12, 0, 1, 999)], + # Record(timestamp_field=1672574401000999), + # "timestamp_field=2023-01-01T12%3A00%3A01.000999", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp_ntz, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamp_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # """, + # ), + # ( + # [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + # [datetime(2023, 1, 1, 12, 0, 1)], + # Record(timestamp_field=1672574401000000), + # "timestamp_field=2023-01-01T12%3A00%3A01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp_ntz, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(timestamp_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # """, + # ), + # ( + # [PartitionField(source_id=8, field_id=1001, transform=IdentityTransform(), name="timestamp_field")], + # [datetime(2023, 1, 1, 12, 0, 0)], + # Record(timestamp_field=1672574400000000), + # "timestamp_field=2023-01-01T12%3A00%3A00", + # # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # # AssertionError: assert 'timestamp_field=2023-01-01T12%3A00%3A00' in 's3://warehouse/default/test_table/data/timestamp_field=2023-01-01T12%3A00/00000-5-f9dca69a-9fb7-4830-9ef6-62d3d7afc09e-00001.parquet' + # # TLDR: CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ) becomes 2023-01-01T12:00 in the hive partition path when spark writes it (without the seconds). + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # timestamp_field timestamp_ntz, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(timestamp_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (CAST('2023-01-01 12:00:00' AS TIMESTAMP_NTZ), 'Associated string value for timestamp 2023-01-01T12:00:00') + # # """ + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=IdentityTransform(), name="timestamptz_field")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field=1672563601000999), + # "timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00", + # # Spark writes differently as pyiceberg, so justification (compare expected value with spark behavior) would fail + # # AssertionError: assert 'timestamptz_field=2023-01-01T09%3A00%3A01.000999%2B00%3A00' in 's3://warehouse/default/test_table/data/timestamptz_field=2023-01-01T09%3A00%3A01.000999Z/00000-5-b710fc4d-66b6-47f1-b8ae-6208f8aaa2d4-00001.parquet' + # # TLDR: CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP) becomes 2023-01-01T09:00:01.000999Z in the hive partition path when spark writes it (while iceberg: timestamptz_field=2023-01-01T09:00:01.000999+00:00). + # None, + # None, + # # f"""CREATE TABLE {identifier} ( + # # timestamptz_field timestamp, + # # string_field string + # # ) + # # USING iceberg + # # PARTITIONED BY ( + # # identity(timestamptz_field) + # # ) + # # """, + # # f"""INSERT INTO {identifier} + # # VALUES + # # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Associated string value for timestamp 2023-01-01 12:00:01.000999+03:00') + # # """ + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="date_field")], + # [date(2023, 1, 1)], + # Record(date_field=19358), + # "date_field=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(date_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Associated string value for date 2023-01-01') + # """, + # ), + # ( + # [PartitionField(source_id=14, field_id=1001, transform=IdentityTransform(), name="uuid_field")], + # [uuid.UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")], + # Record(uuid_field="f47ac10b-58cc-4372-a567-0e02b2c3d479"), + # "uuid_field=f47ac10b-58cc-4372-a567-0e02b2c3d479", + # f"""CREATE TABLE {identifier} ( + # uuid_field string, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(uuid_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('f47ac10b-58cc-4372-a567-0e02b2c3d479', 'Associated string value for UUID f47ac10b-58cc-4372-a567-0e02b2c3d479') + # """, + # ), + # ( + # [PartitionField(source_id=11, field_id=1001, transform=IdentityTransform(), name="binary_field")], + # [b"example"], + # Record(binary_field=b"example"), + # "binary_field=ZXhhbXBsZQ%3D%3D", + # f"""CREATE TABLE {identifier} ( + # binary_field binary, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(binary_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('example' AS BINARY), 'Associated string value for binary `example`') + # """, + # ), + # ( + # [PartitionField(source_id=13, field_id=1001, transform=IdentityTransform(), name="decimal_field")], + # [Decimal("123.45")], + # Record(decimal_field=Decimal("123.45")), + # "decimal_field=123.45", + # f"""CREATE TABLE {identifier} ( + # decimal_field decimal(5,2), + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(decimal_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (123.45, 'Associated string value for decimal 123.45') + # """, + # ), # # Year Month Day Hour Transform # Month Transform ( @@ -386,362 +378,362 @@ (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP_NTZ), 'Event at 2023-01-01 11:55:59.999999'); """, ), - ( - [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), - "timestamptz_field_month=2023-01", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - month(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], - [date(2023, 1, 1)], - Record(date_field_month=((2023 - 1970) * 12)), - "date_field_month=2023-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - month(date_field) -- Partitioning by month from 'date_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # Year Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_year=(2023 - 1970)), - "timestamp_field_year=2023", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamp_field) -- Partitioning by year from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_year=53), - "timestamptz_field_year=2023", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], - [date(2023, 1, 1)], - Record(date_field_year=(2023 - 1970)), - "date_field_year=2023", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(date_field) -- Partitioning by year from 'date_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # # Day Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_day=19358), - "timestamp_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(timestamp_field) -- Partitioning by day from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_day=19358), - "timestamptz_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - ( - [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], - [date(2023, 1, 1)], - Record(date_field_day=19358), - "date_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - day(date_field) -- Partitioning by day from 'date_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); - """, - ), - # Hour Transform - ( - [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], - [datetime(2023, 1, 1, 11, 55, 59, 999999)], - Record(timestamp_field_hour=464603), - "timestamp_field_hour=2023-01-01-11", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); - """, - ), - ( - [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], - [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], - Record(timestamptz_field_hour=464601), - "timestamptz_field_hour=2023-01-01-09", - f"""CREATE TABLE {identifier} ( - timestamptz_field timestamp, - string_field string - ) - USING iceberg - PARTITIONED BY ( - hour(timestamptz_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); - """, - ), - # Truncate Transform - ( - [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], - [12345], - Record(int_field_trunc=12340), - "int_field_trunc=12340", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 - ) - """, - f"""INSERT INTO {identifier} - VALUES - (12345, 'Sample data for int'); - """, - ), - ( - [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], - [2**32 + 1], - Record(bigint_field_trunc=2**32), # 4294967296 - "bigint_field_trunc=4294967296", - f"""CREATE TABLE {identifier} ( - bigint_field bigint, - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 - ) - """, - f"""INSERT INTO {identifier} - VALUES - (4294967297, 'Sample data for long'); - """, - ), - ( - [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], - ["abcdefg"], - Record(string_field_trunc="abc"), - "string_field_trunc=abc", - f"""CREATE TABLE {identifier} ( - string_field string, - another_string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('abcdefg', 'Another sample for string'); - """, - ), - ( - [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], - [Decimal("678.93")], - Record(decimal_field_trunc=Decimal("678.90")), - "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 - f"""CREATE TABLE {identifier} ( - decimal_field decimal(5,2), - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(decimal_field, 2) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (678.90, 'Associated string value for decimal 678.90') - """, - ), - ( - [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], - [b"HELLOICEBERG"], - Record(binary_field_trunc=b"HELLOICEBE"), - "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", - f"""CREATE TABLE {identifier} ( - binary_field binary, - string_field string - ) - USING iceberg - PARTITIONED BY ( - truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes - ) - """, - f"""INSERT INTO {identifier} - VALUES - (binary('HELLOICEBERG'), 'Sample data for binary'); - """, - ), - # Bucket Transform - ( - [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], - [10], - Record(int_field_bucket=0), - "int_field_bucket=0", - f"""CREATE TABLE {identifier} ( - int_field int, - string_field string - ) - USING iceberg - PARTITIONED BY ( - bucket(2, int_field) -- Distributing 'int_field' across 2 buckets - ) - """, - f"""INSERT INTO {identifier} - VALUES - (10, 'Integer with value 10'); - """, - ), - # Test multiple field combinations could generate the Partition record and hive partition path correctly - ( - [ - PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), - PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), - ], - [ - datetime(2023, 1, 1, 11, 55, 59, 999999), - date(2023, 1, 1), - ], - Record(timestamp_field_year=53, date_field_day=19358), - "timestamp_field_year=2023/date_field_day=2023-01-01", - f"""CREATE TABLE {identifier} ( - timestamp_field timestamp, - date_field date, - string_field string - ) - USING iceberg - PARTITIONED BY ( - year(timestamp_field), - day(date_field) - ) - """, - f"""INSERT INTO {identifier} - VALUES - (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); - """, - ), - # Test that special characters are URL-encoded - ( - [PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], - ["special string"], - Record(**{"special#string+field": "special string"}), # type: ignore - "special%23string%2Bfield=special+string", - f"""CREATE TABLE {identifier} ( - `special#string+field` string - ) - USING iceberg - PARTITIONED BY ( - identity(`special#string+field`) - ) - """, - f"""INSERT INTO {identifier} - VALUES - ('special string') - """, - ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=MonthTransform(), name="timestamptz_field_month")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_month=((2023 - 1970) * 12 + 1 - 1)), + # "timestamptz_field_month=2023-01", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # month(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="date_field_month")], + # [date(2023, 1, 1)], + # Record(date_field_month=((2023 - 1970) * 12)), + # "date_field_month=2023-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # month(date_field) -- Partitioning by month from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # Year Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_year=(2023 - 1970)), + # "timestamp_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamp_field) -- Partitioning by year from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event at 2023-01-01 11:55:59.999999'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=YearTransform(), name="timestamptz_field_year")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_year=53), + # "timestamptz_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=YearTransform(), name="date_field_year")], + # [date(2023, 1, 1)], + # Record(date_field_year=(2023 - 1970)), + # "date_field_year=2023", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(date_field) -- Partitioning by year from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # # Day Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=DayTransform(), name="timestamp_field_day")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_day=19358), + # "timestamp_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(timestamp_field) -- Partitioning by day from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=DayTransform(), name="timestamptz_field_day")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_day=19358), + # "timestamptz_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # ( + # [PartitionField(source_id=10, field_id=1001, transform=DayTransform(), name="date_field_day")], + # [date(2023, 1, 1)], + # Record(date_field_day=19358), + # "date_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # day(date_field) -- Partitioning by day from 'date_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01' AS DATE), 'Event on 2023-01-01'); + # """, + # ), + # # Hour Transform + # ( + # [PartitionField(source_id=8, field_id=1001, transform=HourTransform(), name="timestamp_field_hour")], + # [datetime(2023, 1, 1, 11, 55, 59, 999999)], + # Record(timestamp_field_hour=464603), + # "timestamp_field_hour=2023-01-01-11", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # hour(timestamp_field) -- Partitioning by hour from 'timestamp_field' + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), 'Event within the 11th hour of 2023-01-01'); + # """, + # ), + # ( + # [PartitionField(source_id=9, field_id=1001, transform=HourTransform(), name="timestamptz_field_hour")], + # [datetime(2023, 1, 1, 12, 0, 1, 999, tzinfo=timezone(timedelta(hours=3)))], + # Record(timestamptz_field_hour=464601), + # "timestamptz_field_hour=2023-01-01-09", + # f"""CREATE TABLE {identifier} ( + # timestamptz_field timestamp, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # hour(timestamptz_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 12:00:01.000999+03:00' AS TIMESTAMP), 'Event at 2023-01-01 12:00:01.000999+03:00'); + # """, + # ), + # # Truncate Transform + # ( + # [PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(10), name="int_field_trunc")], + # [12345], + # Record(int_field_trunc=12340), + # "int_field_trunc=12340", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(int_field, 10) -- Truncating 'int_field' integer column to a width of 10 + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (12345, 'Sample data for int'); + # """, + # ), + # ( + # [PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="bigint_field_trunc")], + # [2**32 + 1], + # Record(bigint_field_trunc=2**32), # 4294967296 + # "bigint_field_trunc=4294967296", + # f"""CREATE TABLE {identifier} ( + # bigint_field bigint, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(bigint_field, 2) -- Truncating 'bigint_field' long column to a width of 2 + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (4294967297, 'Sample data for long'); + # """, + # ), + # ( + # [PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(3), name="string_field_trunc")], + # ["abcdefg"], + # Record(string_field_trunc="abc"), + # "string_field_trunc=abc", + # f"""CREATE TABLE {identifier} ( + # string_field string, + # another_string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(string_field, 3) -- Truncating 'string_field' string column to a length of 3 characters + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('abcdefg', 'Another sample for string'); + # """, + # ), + # ( + # [PartitionField(source_id=13, field_id=1001, transform=TruncateTransform(width=5), name="decimal_field_trunc")], + # [Decimal("678.93")], + # Record(decimal_field_trunc=Decimal("678.90")), + # "decimal_field_trunc=678.90", # Assuming truncation width of 1 leads to truncating to 670 + # f"""CREATE TABLE {identifier} ( + # decimal_field decimal(5,2), + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(decimal_field, 2) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (678.90, 'Associated string value for decimal 678.90') + # """, + # ), + # ( + # [PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(10), name="binary_field_trunc")], + # [b"HELLOICEBERG"], + # Record(binary_field_trunc=b"HELLOICEBE"), + # "binary_field_trunc=SEVMTE9JQ0VCRQ%3D%3D", + # f"""CREATE TABLE {identifier} ( + # binary_field binary, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # truncate(binary_field, 10) -- Truncating 'binary_field' binary column to a length of 10 bytes + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (binary('HELLOICEBERG'), 'Sample data for binary'); + # """, + # ), + # # Bucket Transform + # ( + # [PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_field_bucket")], + # [10], + # Record(int_field_bucket=0), + # "int_field_bucket=0", + # f"""CREATE TABLE {identifier} ( + # int_field int, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # bucket(2, int_field) -- Distributing 'int_field' across 2 buckets + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (10, 'Integer with value 10'); + # """, + # ), + # # Test multiple field combinations could generate the Partition record and hive partition path correctly + # ( + # [ + # PartitionField(source_id=8, field_id=1001, transform=YearTransform(), name="timestamp_field_year"), + # PartitionField(source_id=10, field_id=1002, transform=DayTransform(), name="date_field_day"), + # ], + # [ + # datetime(2023, 1, 1, 11, 55, 59, 999999), + # date(2023, 1, 1), + # ], + # Record(timestamp_field_year=53, date_field_day=19358), + # "timestamp_field_year=2023/date_field_day=2023-01-01", + # f"""CREATE TABLE {identifier} ( + # timestamp_field timestamp, + # date_field date, + # string_field string + # ) + # USING iceberg + # PARTITIONED BY ( + # year(timestamp_field), + # day(date_field) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # (CAST('2023-01-01 11:55:59.999999' AS TIMESTAMP), CAST('2023-01-01' AS DATE), 'some data'); + # """, + # ), + # # Test that special characters are URL-encoded + # ( + # [PartitionField(source_id=15, field_id=1001, transform=IdentityTransform(), name="special#string+field")], + # ["special string"], + # Record(**{"special#string+field": "special string"}), # type: ignore + # "special%23string%2Bfield=special+string", + # f"""CREATE TABLE {identifier} ( + # `special#string+field` string + # ) + # USING iceberg + # PARTITIONED BY ( + # identity(`special#string+field`) + # ) + # """, + # f"""INSERT INTO {identifier} + # VALUES + # ('special string') + # """, + # ), ], ) @pytest.mark.integration @@ -755,11 +747,14 @@ def test_partition_key( spark_create_table_sql_for_justification: str, spark_data_insert_sql_for_justification: str, ) -> None: - partition_field_values = [PartitionFieldValue(field, value) for field, value in zip(partition_fields, partition_values)] + field_values = [ + PartitionFieldValue(field, field.transform.transform(TABLE_SCHEMA.find_field(field.source_id).field_type)(value)) + for field, value in zip(partition_fields, partition_values) + ] spec = PartitionSpec(*partition_fields) key = PartitionKey( - raw_partition_field_values=partition_field_values, + field_values=field_values, partition_spec=spec, schema=TABLE_SCHEMA, ) diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 3f51475ee1..1e6ea1b797 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -1126,23 +1126,3 @@ def test_append_multiple_partitions( """ ) assert files_df.count() == 6 - - -@pytest.mark.integration -def test_pyarrow_overflow(session_catalog: Catalog) -> None: - """Test what happens when the offset is beyond 32 bits""" - identifier = "default.arrow_table_overflow" - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - arr = ["fixed_string"] * 30_000 - strings = pa.chunked_array([arr] * 10_000) - # Create pa.table - arrow_table = pa.table({"a": strings}) - - table = session_catalog.create_table(identifier, arrow_table.schema) - with table.update_spec() as update_spec: - update_spec.add_field("b", IdentityTransform(), "pb") - table.append(arrow_table) diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 67911b6271..9234dd07a8 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -27,7 +27,7 @@ PARTITION_FIELD = PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="string_field") PARTITION_KEY = PartitionKey( - raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")], + field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")], partition_spec=PartitionSpec(PARTITION_FIELD), schema=Schema(NestedField(field_id=1, name="string_field", field_type=StringType(), required=False)), ) From 993c38217fb1f3db807006cf5e0d599a29ab7b9b Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 22 Jan 2025 23:00:40 +0100 Subject: [PATCH 8/8] Refactor `{year,month,day,hour}` transform --- pyiceberg/transforms.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pyiceberg/transforms.py b/pyiceberg/transforms.py index 22dcdfe88a..64ea1e5fbc 100644 --- a/pyiceberg/transforms.py +++ b/pyiceberg/transforms.py @@ -419,11 +419,17 @@ def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int if isinstance(source, DateType): def year_func(v: Any) -> int: + if isinstance(v, py_datetime.date): + v = datetime.date_to_days(v) + return datetime.days_to_years(v) elif isinstance(source, (TimestampType, TimestamptzType)): def year_func(v: Any) -> int: + if isinstance(v, py_datetime.datetime): + v = datetime.datetime_to_micros(v) + return datetime.micros_to_years(v) else: @@ -476,11 +482,17 @@ def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int if isinstance(source, DateType): def month_func(v: Any) -> int: + if isinstance(v, py_datetime.date): + v = datetime.date_to_days(v) + return datetime.days_to_months(v) elif isinstance(source, (TimestampType, TimestamptzType)): def month_func(v: Any) -> int: + if isinstance(v, py_datetime.datetime): + v = datetime.datetime_to_micros(v) + return datetime.micros_to_months(v) else: @@ -539,11 +551,17 @@ def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int if isinstance(source, DateType): def day_func(v: Any) -> int: + if isinstance(v, py_datetime.date): + v = datetime.date_to_days(v) + return v elif isinstance(source, (TimestampType, TimestamptzType)): def day_func(v: Any) -> int: + if isinstance(v, py_datetime.datetime): + v = datetime.datetime_to_micros(v) + return datetime.micros_to_days(v) else: @@ -604,6 +622,9 @@ def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int if isinstance(source, (TimestampType, TimestamptzType)): def hour_func(v: Any) -> int: + if isinstance(v, py_datetime.datetime): + v = datetime.datetime_to_micros(v) + return datetime.micros_to_hours(v) else: