Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
adfbd3c
Skeletal implementation
Dec 20, 2024
ea2b456
First attempt at hashing locations
Dec 20, 2024
ce5f0d5
Relocate to table submodule; code and comment improvements
Dec 20, 2024
d3e0c0f
Add unit tests
Dec 20, 2024
00917e9
Remove entropy check
Dec 20, 2024
c4e6be9
Merge branch 'main' into location-providers
smaheshwar-pltr Dec 20, 2024
bc2eab8
Nit: Prefer `self.table_properties`
Dec 20, 2024
9999cbb
Remove special character testing
Dec 21, 2024
23ef8f5
Add integration tests for writes
Dec 23, 2024
e47e18f
Move all `LocationProviders`-related code into locations.py
Jan 9, 2025
45391de
Nit: tiny for loop refactor
Jan 9, 2025
065bcbf
Fix typo
Jan 9, 2025
e5214d4
Object storage as default location provider
Jan 9, 2025
568af55
Update tests/integration/test_writes/test_partitioned_writes.py
smaheshwar-pltr Jan 9, 2025
e77af29
Test entropy in test_object_storage_injects_entropy
Jan 9, 2025
651aaea
Refactor integration tests to use properties and omit when default once
Jan 9, 2025
5bfa24b
Use a different table property for custom location provision
Jan 9, 2025
8cd46fa
write.location-provider.py-impl -> write.py-location-provider.impl
Jan 9, 2025
3dbb8d0
Merge branch 'main' into location-providers
Jan 10, 2025
e992c24
Make lint
Jan 10, 2025
f1e4a31
Move location provider loading into `write_file` for back-compat
Jan 10, 2025
46dd7ab
Make object storage no longer the default
Jan 10, 2025
490d08c
Merge branch 'main' into location-providers
Jan 10, 2025
3555932
Add test case for partitioned paths disabled but with no partition sp…
Jan 10, 2025
55d6c4f
Moved constants within ObjectStoreLocationProvider
Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Relocate to table submodule; code and comment improvements
  • Loading branch information
Sreesh Maheshwar committed Dec 20, 2024
commit ce5f0d54d22a0267488235e4430eb89854892052
60 changes: 0 additions & 60 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@
)
from urllib.parse import urlparse

from pyiceberg.partitioning import PartitionKey
from pyiceberg.table import TableProperties
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.utils.properties import property_as_bool

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -296,29 +293,6 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"""


class LocationProvider(ABC):
"""A base class for location providers, that provide data file locations to write tasks."""

table_location: str
table_properties: Properties

def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
self.table_properties = table_properties

@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
"""Return a fully-qualified data file location for the given filename.

Args:
data_file_name (str): The name of the data file.
partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data file is not partitioned.

Returns:
str: A fully-qualified location URI for the data file.
"""


LOCATION = "location"
WAREHOUSE = "warehouse"

Expand Down Expand Up @@ -370,40 +344,6 @@ def _infer_file_io_from_scheme(path: str, properties: Properties) -> Optional[Fi
return None


def _import_location_provider(location_provider_impl: str, table_location: str, table_properties: Properties) -> Optional[LocationProvider]:
try:
path_parts = location_provider_impl.split(".")
if len(path_parts) < 2:
raise ValueError(f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}")
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
return class_(table_location, table_properties)
except ModuleNotFoundError:
logger.warning("Could not initialize LocationProvider: %s", location_provider_impl)
return None


def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider:
table_location = table_location.rstrip("/")

if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL):
if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties):
logger.info("Loaded LocationProvider: %s", location_provider_impl)
return location_provider
else:
raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}")

if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT):
from pyiceberg.io.locations import ObjectStoreLocationProvider

return ObjectStoreLocationProvider(table_location, table_properties)
else:
from pyiceberg.io.locations import DefaultLocationProvider

return DefaultLocationProvider(table_location, table_properties)


def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
# First look for the py-io-impl property to directly load the class
if io_impl := properties.get(PY_IO_IMPL):
Expand Down
17 changes: 10 additions & 7 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
InputStream,
OutputFile,
OutputStream,
_parse_location, LocationProvider, load_location_provider,
_parse_location,
)
from pyiceberg.manifest import (
DataFile,
Expand All @@ -136,6 +136,10 @@
visit,
visit_with_partner,
)
from pyiceberg.table import (
LocationProvider,
load_location_provider,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -2415,7 +2419,9 @@ def data_file_statistics_from_parquet_metadata(
)


def write_file(io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
def write_file(
io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask]
) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might want location_provider: LocationProvider last for backwards compatibility

Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about leaving the signature as before and doing load_location_provider at the start of this function (above parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) instead of in _dataframe_to_data_files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would mean we need to run load_location_provider per data file and can potentially get expensive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so? At the start of the function means not in write_parquet - the location_provider loaded would be just be used within that, similar to parquet_writer_kwargs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah makes sense, write_parquet is called once per _dataframe_to_data_files

we can do that to preserve backwards compatibility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! (typo correction: write_file above 😄)


parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
Expand Down Expand Up @@ -2447,7 +2453,7 @@ def write_parquet(task: WriteTask) -> DataFile:
]
arrow_table = pa.Table.from_batches(batches)
file_path = location_provider.new_data_location(
data_file_name=task.generate_data_file_filename('parquet'),
data_file_name=task.generate_data_file_filename("parquet"),
partition_key=task.partition_key,
)
fo = io.new_output(file_path)
Expand Down Expand Up @@ -2625,10 +2631,7 @@ def _dataframe_to_data_files(
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
location_provider = load_location_provider(
table_location=table_metadata.location,
table_properties=table_metadata.properties
)
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't love this. I wanted to do something like this and cache on at least the Transaction (which this method is exclusively invoked by) but the problem I think is that properties can change on the Transaction, potentially changing the location provider to be used. I suppose we can update that provider on a property change (or maybe any metadata change) but unsure if this complexity is even worth it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats an interesting edge case. it seems like an anti-pattern to change the table property and write in the same transaction, although its currently allowed

Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3555932 (fyi the Java tests don't have one)

name_mapping = table_metadata.schema().name_mapping
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
Expand Down
66 changes: 65 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# under the License.
from __future__ import annotations

import importlib
import itertools
import logging
import uuid
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -138,7 +140,6 @@
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.deprecated import deprecation_message as deprecation_message
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
Expand All @@ -150,6 +151,8 @@

from pyiceberg.catalog import Catalog

logger = logging.getLogger(__name__)

ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"

Expand Down Expand Up @@ -1633,6 +1636,67 @@ class AddFileTask:
partition_field_value: Record


class LocationProvider(ABC):
"""A base class for location providers, that provide data file locations for write tasks."""

table_location: str
table_properties: Properties

def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
self.table_properties = table_properties

@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
"""Return a fully-qualified data file location for the given filename.

Args:
data_file_name (str): The name of the data file.
partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned.

Returns:
str: A fully-qualified location URI for the data file.
"""


def _import_location_provider(
location_provider_impl: str, table_location: str, table_properties: Properties
) -> Optional[LocationProvider]:
try:
path_parts = location_provider_impl.split(".")
if len(path_parts) < 2:
raise ValueError(
f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}"
)
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, wonder if we should reduce duplication between this and file IO loading.

return class_(table_location, table_properties)
except ModuleNotFoundError:
logger.warning("Could not initialize LocationProvider: %s", location_provider_impl)
return None


def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider:
table_location = table_location.rstrip("/")

if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL):
if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties):
logger.info("Loaded LocationProvider: %s", location_provider_impl)
return location_provider
else:
raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}")

if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT):
from pyiceberg.table.locations import ObjectStoreLocationProvider

return ObjectStoreLocationProvider(table_location, table_properties)
else:
from pyiceberg.table.locations import DefaultLocationProvider

return DefaultLocationProvider(table_location, table_properties)


def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.

Expand Down
30 changes: 19 additions & 11 deletions pyiceberg/io/locations.py → pyiceberg/table/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Optional

from pyiceberg.io import LocationProvider
import mmh3

from pyiceberg.partitioning import PartitionKey
from pyiceberg.table import TableProperties
from pyiceberg.table import LocationProvider, TableProperties
from pyiceberg.typedef import Properties
from pyiceberg.utils.properties import property_as_bool


class DefaultLocationProvider(LocationProvider):
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest difference vs the Java implementations is that I've not supported write.data.path here. I think it's natural for write.metadata.path to be supported alongside this so this would be a larger and arguably location-provider-independent change? Can look into it as a follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! would be great to have write.data.path and write.metadata.path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened an issue on supporting write.data.path and write.metadata.path
#1492

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry guys, didn't notice this thread until now.


def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)

Expand All @@ -39,12 +40,15 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti


class ObjectStoreLocationProvider(LocationProvider):

_include_partition_paths: bool

def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)
self._include_partition_paths = property_as_bool(table_properties, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT)
self._include_partition_paths = property_as_bool(
table_properties,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT,
)

def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to make this as consistent with its Java counter-part so file locations are consistent too. This means hashing on both the partition key and the data file name below, and using the same hash function.

Seemed reasonable to port over the the object storage stuff in this PR, given that the original issue #861 mentions this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Iceberg is mainly focussed on object-stores, I'm leaning towards making the ObjectStorageLocationProvider the default. Java is a great source of inspiration, but it also holds a lot of historical decisions that are not easy to change, so we should reconsider this at PyIceberg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this great suggestion and context! I agree:

  • I made this the default. The MANIFEST_MERGE_ENABLED_DEFAULT property already differs from Java and the docs which reassures me. I did still add a short comment beside OBJECT_STORE_ENABLED_DEFAULT to indicate that it differs.
  • I renamed DefaultLocationProvider to SimpleLocationProvider because it's no longer the default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ cc @kevinjqliu, how does this sound to you? I realise the concerns you raised re things silently working differently with Java and PyIceberg seem a little contradicting with the above (but I think it's fine).

Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I've not yet changed WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT to False (Java/docs have true) even though that's more aligned with object storage - from the docs:

We have also added a new table property write.object-storage.partitioned-paths that if set to false(default=true), this will omit the partition values from the file path. Iceberg does not need these values in the file path and setting this value to false can further reduce the key size.

I'm very open to be swayed / discuss this. After reading through apache/iceberg#11112 it seems there was a strong case for still supporting partition values in paths though I haven't been able to flesh it out fully. Perhaps it's backwards compatibility, for folks that inspect storage to see how their files are actually laid out; it does group them together nicely.

I'd be happy to change the default if there's reason for it. The readability of file paths will arguably anyway decrease with these hashes so the above might be a non-issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While im in favor of making ObjectStorageLocationProvider the default for pyiceberg, i'd prefer to do so in a follow-up PR.
I like having this PR solely to implement the concept of LocationProvider and the ObjectStorageProvider

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While im in favor of making ObjectStorageLocationProvider the default for pyiceberg, i'd prefer to do so in a follow-up PR.
I like having this PR solely to implement the concept of LocationProvider and the ObjectStorageProvider

Makes sense! We can have the discussion regarding defaults there. I'd like to keep the SimpleLocationProvider naming change from Default here though and discuss which provider should be the default in the next PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM! 🚀

if self._include_partition_paths and partition_key:
Expand All @@ -53,22 +57,26 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
prefix = f"{self.table_location}/data"
hashed_path = self._compute_hash(data_file_name)

return f"{prefix}/{hashed_path}/{data_file_name}" if self._include_partition_paths else f"{prefix}/{hashed_path}-{data_file_name}"
return (
f"{prefix}/{hashed_path}/{data_file_name}"
if self._include_partition_paths
else f"{prefix}/{hashed_path}-{data_file_name}"
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that disabling include_partition_paths affects paths of non-partitioned data files. I've matched Java behaviour here but it does feel odd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an interesting case, do we have a test to show this behavior explicitly? i think it'll be valuable to refer to it at a later time

)

@staticmethod
def _compute_hash(data_file_name: str) -> str:
import mmh3

# Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip.
hash_code = mmh3.hash(data_file_name) & ((1 << HASH_BINARY_STRING_BITS) - 1) | (1 << HASH_BINARY_STRING_BITS)
return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-HASH_BINARY_STRING_BITS:])

@staticmethod
def _dirs_from_hash(file_hash: str) -> str:
"""Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH."""
hash_with_dirs = []
for i in range(0, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, ENTROPY_DIR_LENGTH):
hash_with_dirs.append(file_hash[i:i + ENTROPY_DIR_LENGTH])
hash_with_dirs.append(file_hash[i : i + ENTROPY_DIR_LENGTH])

if len(file_hash) > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH:
hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH:])
hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH :])

return '/'.join(hash_with_dirs)
return "/".join(hash_with_dirs)