diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index dadaac93b4..8833b3f0f0 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -362,6 +362,15 @@ for buf in tbl.scan().to_arrow_batch_reader(batch_size=1000): print(f"Buffer contains {len(buf)} rows") ``` +By default, each file's batches are materialized in memory before being yielded (`order=ScanOrder.TASK`). For large files that may exceed available memory, use `order=ScanOrder.ARRIVAL` to yield batches as they are produced without materializing entire files: + +```python +from pyiceberg.table import ScanOrder + +for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, batch_size=1000): + print(f"Buffer contains {len(buf)} rows") +``` + To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow: ```python @@ -1635,6 +1644,17 @@ table.scan( ).to_arrow_batch_reader(batch_size=1000) ``` +Use `order=ScanOrder.ARRIVAL` to avoid materializing entire files in memory. This yields batches as they are produced by PyArrow, one file at a time: + +```python +from pyiceberg.table import ScanOrder + +table.scan( + row_filter=GreaterThanOrEqual("trip_distance", 10.0), + selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), +).to_arrow_batch_reader(order=ScanOrder.ARRIVAL) +``` + ### Pandas diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e7c0da5262..e8de6a956e 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -141,7 +141,7 @@ visit, visit_with_partner, ) -from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties +from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ScanOrder, TableProperties from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping @@ -1761,7 +1761,12 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: return result - def to_record_batches(self, tasks: Iterable[FileScanTask], batch_size: int | None = None) -> Iterator[pa.RecordBatch]: + def to_record_batches( + self, + tasks: Iterable[FileScanTask], + batch_size: int | None = None, + order: ScanOrder = ScanOrder.TASK, + ) -> Iterator[pa.RecordBatch]: """Scan the Iceberg table and return an Iterator[pa.RecordBatch]. Returns an Iterator of pa.RecordBatch with data from the Iceberg table @@ -1770,6 +1775,13 @@ def to_record_batches(self, tasks: Iterable[FileScanTask], batch_size: int | Non Args: tasks: FileScanTasks representing the data files and delete files to read from. + batch_size: The number of rows per batch. If None, PyArrow's default is used. + order: Controls the order in which record batches are returned. + ScanOrder.TASK (default) returns batches in task order, with each task + fully materialized before proceeding to the next. Allows parallel file + reads via executor. ScanOrder.ARRIVAL yields batches as they are + produced, processing tasks sequentially without materializing entire + files into memory. Returns: An Iterator of PyArrow RecordBatches. @@ -1777,10 +1789,22 @@ def to_record_batches(self, tasks: Iterable[FileScanTask], batch_size: int | Non Raises: ResolveError: When a required field cannot be found in the file - ValueError: When a field type in the file cannot be projected to the schema type + ValueError: When a field type in the file cannot be projected to the schema type, + or when an invalid order value is provided. """ + if not isinstance(order, ScanOrder): + raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder enum value (ScanOrder.TASK or ScanOrder.ARRIVAL).") + deletes_per_file = _read_all_delete_files(self._io, tasks) + if order == ScanOrder.ARRIVAL: + # Arrival order: process all tasks sequentially, yielding batches as produced. + # _record_batches_from_scan_tasks_and_deletes handles the limit internally + # when called with all tasks, so no outer limit check is needed. + yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size) + return + + # Task order: existing behavior with executor.map + list() total_row_count = 0 executor = ExecutorFactory.get_or_create() diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 164eba5d32..a4b9cba5f8 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -23,6 +23,7 @@ from abc import ABC, abstractmethod from collections.abc import Callable, Iterable, Iterator from dataclasses import dataclass +from enum import Enum from functools import cached_property from itertools import chain from types import TracebackType @@ -154,6 +155,20 @@ DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" +class ScanOrder(str, Enum): + """Order in which record batches are returned from a scan. + + Attributes: + TASK: Batches are returned in task order, with each task fully materialized + before proceeding to the next. Allows parallel file reads via executor. + ARRIVAL: Batches are yielded as they are produced, processing tasks + sequentially without materializing entire files into memory. + """ + + TASK = "task" + ARRIVAL = "arrival" + + @dataclass() class UpsertResult: """Summary the upsert operation.""" @@ -2155,7 +2170,7 @@ def to_arrow(self) -> pa.Table: self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit ).to_table(self.plan_files()) - def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatchReader: + def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK) -> pa.RecordBatchReader: """Return an Arrow RecordBatchReader from this DataScan. For large results, using a RecordBatchReader requires less memory than @@ -2164,6 +2179,10 @@ def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatch Args: batch_size: The number of rows per batch. If None, PyArrow's default is used. + order: Controls the order in which record batches are returned. + ScanOrder.TASK (default) returns batches in task order with parallel + file reads. ScanOrder.ARRIVAL yields batches as they are produced, + processing tasks sequentially. Returns: pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan @@ -2176,7 +2195,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatch target_schema = schema_to_pyarrow(self.projection()) batches = ArrowScan( self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_record_batches(self.plan_files(), batch_size=batch_size) + ).to_record_batches(self.plan_files(), batch_size=batch_size, order=order) return pa.RecordBatchReader.from_batches( target_schema, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index fb03f785c6..fa5f180512 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -86,7 +86,7 @@ from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema, make_compatible_name, visit -from pyiceberg.table import FileScanTask, TableProperties +from pyiceberg.table import FileScanTask, ScanOrder, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.table.name_mapping import create_mapping_from_schema from pyiceberg.transforms import HourTransform, IdentityTransform @@ -3106,6 +3106,176 @@ def test_task_to_record_batches_default_batch_size(tmpdir: str) -> None: assert len(batches[0]) == num_rows +def _create_scan_and_tasks( + tmpdir: str, + num_files: int = 1, + rows_per_file: int = 100, + limit: int | None = None, + delete_rows_per_file: list[list[int]] | None = None, +) -> tuple[ArrowScan, list[FileScanTask]]: + """Helper to create an ArrowScan and FileScanTasks for testing. + + Args: + delete_rows_per_file: If provided, a list of lists of row positions to delete + per file. Length must match num_files. Each inner list contains 0-based + row positions within that file to mark as positionally deleted. + """ + table_schema = Schema(NestedField(1, "col", LongType(), required=True)) + pa_schema = pa.schema([pa.field("col", pa.int64(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"})]) + tasks = [] + for i in range(num_files): + start = i * rows_per_file + arrow_table = pa.table({"col": pa.array(range(start, start + rows_per_file))}, schema=pa_schema) + data_file = _write_table_to_data_file(f"{tmpdir}/file_{i}.parquet", pa_schema, arrow_table) + data_file.spec_id = 0 + + delete_files = set() + if delete_rows_per_file and delete_rows_per_file[i]: + delete_table = pa.table( + { + "file_path": [data_file.file_path] * len(delete_rows_per_file[i]), + "pos": delete_rows_per_file[i], + } + ) + delete_path = f"{tmpdir}/deletes_{i}.parquet" + pq.write_table(delete_table, delete_path) + delete_files.add( + DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=delete_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=len(delete_rows_per_file[i]), + file_size_in_bytes=22, + ) + ) + + tasks.append(FileScanTask(data_file=data_file, delete_files=delete_files)) + + scan = ArrowScan( + table_metadata=TableMetadataV2( + location="file://a/b/", + last_column_id=1, + format_version=2, + schemas=[table_schema], + partition_specs=[PartitionSpec()], + ), + io=PyArrowFileIO(), + projected_schema=table_schema, + row_filter=AlwaysTrue(), + case_sensitive=True, + limit=limit, + ) + return scan, tasks + + +def test_task_order_produces_same_results(tmpdir: str) -> None: + """Test that order=ScanOrder.TASK produces the same results as the default behavior.""" + scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100) + + batches_default = list(scan.to_record_batches(tasks, order=ScanOrder.TASK)) + # Re-create tasks since iterators are consumed + _, tasks2 = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100) + batches_task_order = list(scan.to_record_batches(tasks2, order=ScanOrder.TASK)) + + total_default = sum(len(b) for b in batches_default) + total_task_order = sum(len(b) for b in batches_task_order) + assert total_default == 300 + assert total_task_order == 300 + + +def test_arrival_order_yields_all_batches(tmpdir: str) -> None: + """Test that order=ScanOrder.ARRIVAL yields all batches correctly.""" + scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100) + + batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL)) + + total_rows = sum(len(b) for b in batches) + assert total_rows == 300 + # Verify all values are present + all_values = sorted([v for b in batches for v in b.column("col").to_pylist()]) + assert all_values == list(range(300)) + + +def test_arrival_order_with_limit(tmpdir: str) -> None: + """Test that order=ScanOrder.ARRIVAL respects the row limit.""" + scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100, limit=150) + + batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL)) + + total_rows = sum(len(b) for b in batches) + assert total_rows == 150 + + +def test_arrival_order_file_ordering_preserved(tmpdir: str) -> None: + """Test that file ordering is preserved in arrival order mode.""" + scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100) + + batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL)) + all_values = [v for b in batches for v in b.column("col").to_pylist()] + + # Values should be in file order: 0-99 from file 0, 100-199 from file 1, 200-299 from file 2 + assert all_values == list(range(300)) + + +def test_arrival_order_with_positional_deletes(tmpdir: str) -> None: + """Test that order=ScanOrder.ARRIVAL correctly applies positional deletes.""" + # 3 files, 10 rows each; delete rows 0,5 from file 0, row 3 from file 1, nothing from file 2 + scan, tasks = _create_scan_and_tasks( + tmpdir, + num_files=3, + rows_per_file=10, + delete_rows_per_file=[[0, 5], [3], []], + ) + + batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL)) + + total_rows = sum(len(b) for b in batches) + assert total_rows == 27 # 30 - 3 deletes + all_values = sorted([v for b in batches for v in b.column("col").to_pylist()]) + # File 0: 0-9, delete rows 0,5 → values 1,2,3,4,6,7,8,9 + # File 1: 10-19, delete row 3 → values 10,11,12,14,15,16,17,18,19 + # File 2: 20-29, no deletes → values 20-29 + expected = [1, 2, 3, 4, 6, 7, 8, 9] + [10, 11, 12, 14, 15, 16, 17, 18, 19] + list(range(20, 30)) + assert all_values == sorted(expected) + + +def test_arrival_order_with_positional_deletes_and_limit(tmpdir: str) -> None: + """Test that order=ScanOrder.ARRIVAL with positional deletes respects the row limit.""" + # 3 files, 10 rows each; delete row 0 from each file + scan, tasks = _create_scan_and_tasks( + tmpdir, + num_files=3, + rows_per_file=10, + limit=15, + delete_rows_per_file=[[0], [0], [0]], + ) + + batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL)) + + total_rows = sum(len(b) for b in batches) + assert total_rows == 15 + + +def test_task_order_with_positional_deletes(tmpdir: str) -> None: + """Test that the default task order mode correctly applies positional deletes.""" + # 3 files, 10 rows each; delete rows from each file + scan, tasks = _create_scan_and_tasks( + tmpdir, + num_files=3, + rows_per_file=10, + delete_rows_per_file=[[0, 5], [3], []], + ) + + batches = list(scan.to_record_batches(tasks, order=ScanOrder.TASK)) + + total_rows = sum(len(b) for b in batches) + assert total_rows == 27 # 30 - 3 deletes + all_values = sorted([v for b in batches for v in b.column("col").to_pylist()]) + expected = [1, 2, 3, 4, 6, 7, 8, 9] + [10, 11, 12, 14, 15, 16, 17, 18, 19] + list(range(20, 30)) + assert all_values == sorted(expected) + + def test_parse_location_defaults() -> None: """Test that parse_location uses defaults."""