Skip to content

Commit 7c4879a

Browse files
jqin61hpal
authored andcommitted
Construction of filenames for partitioned writes (apache#453)
* PartitionKey Class And Tests * fix linting; add decimal input transform test * fix bool to path lower case; fix timestamptz tests; other pr comments * clean up * add uuid partition type * clean up; rename ambiguous function name
1 parent bc5c412 commit 7c4879a

File tree

5 files changed

+925
-52
lines changed

5 files changed

+925
-52
lines changed

pyiceberg/partitioning.py

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,21 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
import uuid
1920
from abc import ABC, abstractmethod
21+
from dataclasses import dataclass
22+
from datetime import date, datetime
2023
from functools import cached_property, singledispatch
21-
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
24+
from typing import (
25+
Any,
26+
Dict,
27+
Generic,
28+
List,
29+
Optional,
30+
Tuple,
31+
TypeVar,
32+
)
33+
from urllib.parse import quote
2234

2335
from pydantic import (
2436
BeforeValidator,
@@ -41,8 +53,18 @@
4153
YearTransform,
4254
parse_transform,
4355
)
44-
from pyiceberg.typedef import IcebergBaseModel
45-
from pyiceberg.types import NestedField, StructType
56+
from pyiceberg.typedef import IcebergBaseModel, Record
57+
from pyiceberg.types import (
58+
DateType,
59+
IcebergType,
60+
NestedField,
61+
PrimitiveType,
62+
StructType,
63+
TimestampType,
64+
TimestamptzType,
65+
UUIDType,
66+
)
67+
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros
4668

4769
INITIAL_PARTITION_SPEC_ID = 0
4870
PARTITION_FIELD_ID_START: int = 1000
@@ -199,6 +221,23 @@ def partition_type(self, schema: Schema) -> StructType:
199221
nested_fields.append(NestedField(field.field_id, field.name, result_type, required=False))
200222
return StructType(*nested_fields)
201223

224+
def partition_to_path(self, data: Record, schema: Schema) -> str:
225+
partition_type = self.partition_type(schema)
226+
field_types = partition_type.fields
227+
228+
field_strs = []
229+
value_strs = []
230+
for pos, value in enumerate(data.record_fields()):
231+
partition_field = self.fields[pos]
232+
value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value)
233+
234+
value_str = quote(value_str, safe='')
235+
value_strs.append(value_str)
236+
field_strs.append(partition_field.name)
237+
238+
path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)])
239+
return path
240+
202241

203242
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
204243

@@ -326,3 +365,59 @@ def _visit_partition_field(schema: Schema, field: PartitionField, visitor: Parti
326365
return visitor.unknown(field.field_id, source_name, field.source_id, repr(transform))
327366
else:
328367
raise ValueError(f"Unknown transform {transform}")
368+
369+
370+
@dataclass(frozen=True)
371+
class PartitionFieldValue:
372+
field: PartitionField
373+
value: Any
374+
375+
376+
@dataclass(frozen=True)
377+
class PartitionKey:
378+
raw_partition_field_values: List[PartitionFieldValue]
379+
partition_spec: PartitionSpec
380+
schema: Schema
381+
382+
@cached_property
383+
def partition(self) -> Record: # partition key transformed with iceberg internal representation as input
384+
iceberg_typed_key_values = {}
385+
for raw_partition_field_value in self.raw_partition_field_values:
386+
partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id]
387+
if len(partition_fields) != 1:
388+
raise ValueError("partition_fields must contain exactly one field.")
389+
partition_field = partition_fields[0]
390+
iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
391+
iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value)
392+
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
393+
iceberg_typed_key_values[partition_field.name] = transformed_value
394+
return Record(**iceberg_typed_key_values)
395+
396+
def to_path(self) -> str:
397+
return self.partition_spec.partition_to_path(self.partition, self.schema)
398+
399+
400+
@singledispatch
401+
def _to_partition_representation(type: IcebergType, value: Any) -> Any:
402+
return TypeError(f"Unsupported partition field type: {type}")
403+
404+
405+
@_to_partition_representation.register(TimestampType)
406+
@_to_partition_representation.register(TimestamptzType)
407+
def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]:
408+
return datetime_to_micros(value) if value is not None else None
409+
410+
411+
@_to_partition_representation.register(DateType)
412+
def _(type: IcebergType, value: Optional[date]) -> Optional[int]:
413+
return date_to_days(value) if value is not None else None
414+
415+
416+
@_to_partition_representation.register(UUIDType)
417+
def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]:
418+
return str(value) if value is not None else None
419+
420+
421+
@_to_partition_representation.register(PrimitiveType)
422+
def _(type: IcebergType, value: Optional[Any]) -> Optional[Any]:
423+
return value

pyiceberg/transforms.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,11 @@ def _(value: int, _type: IcebergType) -> str:
655655
return _int_to_human_string(_type, value)
656656

657657

658+
@_human_string.register(bool)
659+
def _(value: bool, _type: IcebergType) -> str:
660+
return str(value).lower()
661+
662+
658663
@singledispatch
659664
def _int_to_human_string(_type: IcebergType, value: int) -> str:
660665
return str(value)

tests/conftest.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@
4646
import boto3
4747
import pytest
4848
from moto import mock_aws
49+
from pyspark.sql import SparkSession
4950

5051
from pyiceberg import schema
51-
from pyiceberg.catalog import Catalog
52+
from pyiceberg.catalog import Catalog, load_catalog
5253
from pyiceberg.catalog.noop import NoopCatalog
5354
from pyiceberg.expressions import BoundReference
5455
from pyiceberg.io import (
@@ -1925,3 +1926,51 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
19251926
@pytest.fixture
19261927
def bound_reference_str() -> BoundReference[str]:
19271928
return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None))
1929+
1930+
1931+
@pytest.fixture(scope="session")
1932+
def session_catalog() -> Catalog:
1933+
return load_catalog(
1934+
"local",
1935+
**{
1936+
"type": "rest",
1937+
"uri": "http://localhost:8181",
1938+
"s3.endpoint": "http://localhost:9000",
1939+
"s3.access-key-id": "admin",
1940+
"s3.secret-access-key": "password",
1941+
},
1942+
)
1943+
1944+
1945+
@pytest.fixture(scope="session")
1946+
def spark() -> SparkSession:
1947+
import importlib.metadata
1948+
import os
1949+
1950+
spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
1951+
scala_version = "2.12"
1952+
iceberg_version = "1.4.3"
1953+
1954+
os.environ["PYSPARK_SUBMIT_ARGS"] = (
1955+
f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
1956+
f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell"
1957+
)
1958+
os.environ["AWS_REGION"] = "us-east-1"
1959+
os.environ["AWS_ACCESS_KEY_ID"] = "admin"
1960+
os.environ["AWS_SECRET_ACCESS_KEY"] = "password"
1961+
1962+
spark = (
1963+
SparkSession.builder.appName("PyIceberg integration test")
1964+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
1965+
.config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog")
1966+
.config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
1967+
.config("spark.sql.catalog.integration.uri", "http://localhost:8181")
1968+
.config("spark.sql.catalog.integration.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
1969+
.config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/")
1970+
.config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000")
1971+
.config("spark.sql.catalog.integration.s3.path-style-access", "true")
1972+
.config("spark.sql.defaultCatalog", "integration")
1973+
.getOrCreate()
1974+
)
1975+
1976+
return spark

0 commit comments

Comments
 (0)