From d5f3fdce962990ae3a19cf2649bccc8e07ebe388 Mon Sep 17 00:00:00 2001 From: vtk9 Date: Mon, 24 Feb 2025 15:20:42 -0700 Subject: [PATCH 1/7] make add_files parallelized --- pyiceberg/io/pyarrow.py | 59 ++++++++++++++--------------- pyiceberg/table/__init__.py | 18 ++++++++- tests/integration/test_add_files.py | 29 +++++++++++++- 3 files changed, 73 insertions(+), 33 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f7e3c7c082..5a9dbaa9f2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2464,38 +2464,37 @@ def _check_pyarrow_schema_compatible( _check_schema_compatible(requested_schema, provided_schema) -def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: - for file_path in file_paths: - input_file = io.new_input(file_path) - with input_file.open() as input_stream: - parquet_metadata = pq.read_metadata(input_stream) - - if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()): - raise NotImplementedError( - f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" - ) - schema = table_metadata.schema() - _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) - - statistics = data_file_statistics_from_parquet_metadata( - parquet_metadata=parquet_metadata, - stats_columns=compute_statistics_plan(schema, table_metadata.properties), - parquet_column_mapping=parquet_path_to_id_mapping(schema), - ) - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=statistics.partition(table_metadata.spec(), table_metadata.schema()), - file_size_in_bytes=len(input_file), - sort_order_id=None, - spec_id=table_metadata.default_spec_id, - equality_ids=None, - key_metadata=None, - **statistics.to_serialized_dict(), +def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()): + raise NotImplementedError( + f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" ) + schema = table_metadata.schema() + _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=statistics.partition(table_metadata.spec(), table_metadata.schema()), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) - yield data_file + return data_file ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e625b848b2..76cd8decc8 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1885,12 +1885,26 @@ class AddFileTask: partition_field_value: Record +# NEW def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. Returns: An iterable that supplies DataFiles that describe the parquet files. """ - from pyiceberg.io.pyarrow import parquet_files_to_data_files + from pyiceberg.io.pyarrow import parquet_file_to_data_file + + executor = ExecutorFactory.get_or_create() + futures = [ + executor.submit( + parquet_file_to_data_file, + io, + table_metadata, + file_path + ) + for file_path in file_paths + ] + + return [f.result() for f in futures if f.result()] + - yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths)) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 8713615218..79aa3d886a 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -229,6 +229,33 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids( tbl.add_files(file_paths=file_paths) +@pytest.mark.integration +def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.unpartitioned_table_schema_updates_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(10)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths[0:5]) + tbl.add_files(file_paths=file_paths[5:]) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + print(rows) + + @pytest.mark.integration def test_add_files_to_unpartitioned_table_with_schema_updates( spark: SparkSession, session_catalog: Catalog, format_version: int @@ -265,7 +292,7 @@ def test_add_files_to_unpartitioned_table_with_schema_updates( tbl.add_files(file_paths=[file_path]) rows = spark.sql( f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + SELECT * FROM {identifier}.all_manifests """ ).collect() From b5bfda5a8396ff56a42d46d5b22a41d33217e3e9 Mon Sep 17 00:00:00 2001 From: vtk9 Date: Mon, 24 Feb 2025 15:34:01 -0700 Subject: [PATCH 2/7] change number of files uploaded --- tests/integration/test_add_files.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 79aa3d886a..b75bc0304c 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -243,8 +243,8 @@ def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, f writer.write_table(ARROW_TABLE) # add the parquet files as data files - tbl.add_files(file_paths=file_paths[0:5]) - tbl.add_files(file_paths=file_paths[5:]) + tbl.add_files(file_paths=file_paths[0:2]) + tbl.add_files(file_paths=file_paths[2:]) rows = spark.sql( f""" From efddf112c35a26b24261c166fc927aeb411f821e Mon Sep 17 00:00:00 2001 From: vtk9 Date: Mon, 24 Feb 2025 17:33:17 -0700 Subject: [PATCH 3/7] prep for PR submit --- tests/integration/test_add_files.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index b75bc0304c..9e19432043 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -253,7 +253,9 @@ def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, f """ ).collect() - print(rows) + assert [row.added_data_files_count for row in rows] == [2, 8, 2] + assert [row.existing_data_files_count for row in rows] == [0, 0, 0] + assert [row.deleted_data_files_count for row in rows] == [0, 0, 0] @pytest.mark.integration @@ -292,7 +294,7 @@ def test_add_files_to_unpartitioned_table_with_schema_updates( tbl.add_files(file_paths=[file_path]) rows = spark.sql( f""" - SELECT * + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count FROM {identifier}.all_manifests """ ).collect() From 72e1f4b2a67a5fb29f0398a34f7d6810fbc8521d Mon Sep 17 00:00:00 2001 From: vtk9 Date: Mon, 24 Feb 2025 17:36:26 -0700 Subject: [PATCH 4/7] remove comment --- pyiceberg/table/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 76cd8decc8..83d5903e3e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1885,7 +1885,6 @@ class AddFileTask: partition_field_value: Record -# NEW def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. From be04e9c2ce0f151f91f81b752c85fc569401f6e1 Mon Sep 17 00:00:00 2001 From: vtk9 Date: Wed, 26 Feb 2025 17:59:19 -0700 Subject: [PATCH 5/7] address PR comments - add a better test which checks multiple threads used during execution - re-add `parquet_files_to_data_files` back and let it use `parquet_file_to_data_file` - move `schema = table_metadata.schema()` outside of function it is being used in --- pyiceberg/io/pyarrow.py | 9 ++++- pyiceberg/table/__init__.py | 13 ++----- tests/integration/test_add_files.py | 55 ++++++++++++++++++----------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5a9dbaa9f2..79c28949ef 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2464,7 +2464,14 @@ def _check_pyarrow_schema_compatible( _check_schema_compatible(requested_schema, provided_schema) -def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile: +def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: + for file_path in file_paths: + schema = table_metadata.schema() + data_file = parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path, schema=schema) + yield data_file + + +def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str, schema: Schema) -> DataFile: input_file = io.new_input(file_path) with input_file.open() as input_stream: parquet_metadata = pq.read_metadata(input_stream) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 83d5903e3e..b88f2d7dd9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1893,17 +1893,8 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List """ from pyiceberg.io.pyarrow import parquet_file_to_data_file + schema = table_metadata.schema() executor = ExecutorFactory.get_or_create() - futures = [ - executor.submit( - parquet_file_to_data_file, - io, - table_metadata, - file_path - ) - for file_path in file_paths - ] + futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path, schema) for file_path in file_paths] return [f.result() for f in futures if f.result()] - - diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 9e19432043..9b57181b43 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -18,8 +18,10 @@ import os import re +import threading from datetime import date from typing import Iterator +from unittest import mock import pyarrow as pa import pyarrow.parquet as pq @@ -31,9 +33,11 @@ from pyiceberg.exceptions import NoSuchTableError from pyiceberg.io import FileIO from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types +from pyiceberg.manifest import DataFile from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform from pyiceberg.types import ( BooleanType, @@ -231,31 +235,40 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids( @pytest.mark.integration def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.unpartitioned_table_schema_updates_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) + from pyiceberg.io.pyarrow import parquet_file_to_data_file - file_paths = [f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(10)] - # write parquet files - for file_path in file_paths: - fo = tbl.io.new_output(file_path) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: - writer.write_table(ARROW_TABLE) + real_parquet_file_to_data_file = parquet_file_to_data_file - # add the parquet files as data files - tbl.add_files(file_paths=file_paths[0:2]) - tbl.add_files(file_paths=file_paths[2:]) + lock = threading.Lock() + unique_threads_seen = set() - rows = spark.sql( - f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count - FROM {identifier}.all_manifests - """ - ).collect() + # patch the function _parquet_file_to_data_file to we can track how many unique thread IDs + # it was executed from + with mock.patch("pyiceberg.io.pyarrow.parquet_file_to_data_file") as patch_func: - assert [row.added_data_files_count for row in rows] == [2, 8, 2] - assert [row.existing_data_files_count for row in rows] == [0, 0, 0] - assert [row.deleted_data_files_count for row in rows] == [0, 0, 0] + def mock_parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str, schema: Schema) -> DataFile: + lock.acquire() + thread_id = threading.get_ident() # the current thread ID + unique_threads_seen.add(thread_id) + lock.release() + return real_parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path, schema=schema) + + patch_func.side_effect = mock_parquet_file_to_data_file + + identifier = f"default.unpartitioned_table_schema_updates_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(10)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + tbl.add_files(file_paths=file_paths) + + assert len(unique_threads_seen) == 10 @pytest.mark.integration From ce57944f7fd0dd46873484b15dc059392dec6a75 Mon Sep 17 00:00:00 2001 From: vtk9 Date: Wed, 26 Feb 2025 18:16:55 -0700 Subject: [PATCH 6/7] forgot to remove `schema = table_metadata.schema()` --- pyiceberg/io/pyarrow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 79c28949ef..60aea9db26 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2480,7 +2480,6 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa raise NotImplementedError( f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" ) - schema = table_metadata.schema() _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) statistics = data_file_statistics_from_parquet_metadata( From 03208ca7ab392e531c1c4b6aba340c99c132d3f2 Mon Sep 17 00:00:00 2001 From: vtk9 Date: Thu, 27 Feb 2025 09:34:09 -0700 Subject: [PATCH 7/7] call `parquet_metadata.schema.to_arrow_schema()` once fix integration to be more robust --- pyiceberg/io/pyarrow.py | 12 +++++++----- pyiceberg/table/__init__.py | 3 +-- tests/integration/test_add_files.py | 19 +++++++++++++++---- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 60aea9db26..bf16ec5ec3 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2466,21 +2466,23 @@ def _check_pyarrow_schema_compatible( def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: for file_path in file_paths: - schema = table_metadata.schema() - data_file = parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path, schema=schema) + data_file = parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path) yield data_file -def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str, schema: Schema) -> DataFile: +def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile: input_file = io.new_input(file_path) with input_file.open() as input_stream: parquet_metadata = pq.read_metadata(input_stream) - if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()): + arrow_schema = parquet_metadata.schema.to_arrow_schema() + if visit_pyarrow(arrow_schema, _HasIds()): raise NotImplementedError( f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" ) - _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible(schema, arrow_schema) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=parquet_metadata, diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b88f2d7dd9..45620bce0d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1893,8 +1893,7 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List """ from pyiceberg.io.pyarrow import parquet_file_to_data_file - schema = table_metadata.schema() executor = ExecutorFactory.get_or_create() - futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path, schema) for file_path in file_paths] + futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths] return [f.result() for f in futures if f.result()] diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 9b57181b43..bfbc8db668 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -16,6 +16,7 @@ # under the License. # pylint:disable=redefined-outer-name +import multiprocessing import os import re import threading @@ -241,24 +242,27 @@ def test_add_files_parallelized(spark: SparkSession, session_catalog: Catalog, f lock = threading.Lock() unique_threads_seen = set() + cpu_count = multiprocessing.cpu_count() # patch the function _parquet_file_to_data_file to we can track how many unique thread IDs # it was executed from with mock.patch("pyiceberg.io.pyarrow.parquet_file_to_data_file") as patch_func: - def mock_parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str, schema: Schema) -> DataFile: + def mock_parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile: lock.acquire() thread_id = threading.get_ident() # the current thread ID unique_threads_seen.add(thread_id) lock.release() - return real_parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path, schema=schema) + return real_parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path) patch_func.side_effect = mock_parquet_file_to_data_file identifier = f"default.unpartitioned_table_schema_updates_v{format_version}" tbl = _create_table(session_catalog, identifier, format_version) - file_paths = [f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(10)] + file_paths = [ + f"s3://warehouse/default/add_files_parallel/v{format_version}/test-{i}.parquet" for i in range(cpu_count * 2) + ] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -268,7 +272,14 @@ def mock_parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, fi tbl.add_files(file_paths=file_paths) - assert len(unique_threads_seen) == 10 + # duration creation of threadpool processor, when max_workers is not + # specified, python will add cpu_count + 4 as the number of threads in the + # pool in this case + # https://github.com/python/cpython/blob/e06bebb87e1b33f7251196e1ddb566f528c3fc98/Lib/concurrent/futures/thread.py#L173-L181 + # we check that we have at least seen the number of threads. we don't + # specify the workers in the thread pool and we can't check without + # accessing private attributes of ThreadPoolExecutor + assert len(unique_threads_seen) >= cpu_count @pytest.mark.integration