Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ for buf in tbl.scan().to_arrow_batch_reader(batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

By default, each file's batches are materialized in memory before being yielded (`order=ScanOrder.TASK`). For large files that may exceed available memory, use `order=ScanOrder.ARRIVAL` to yield batches as they are produced without materializing entire files:

```python
from pyiceberg.table import ScanOrder

for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:

```python
Expand Down Expand Up @@ -1635,6 +1644,17 @@ table.scan(
).to_arrow_batch_reader(batch_size=1000)
```

Use `order=ScanOrder.ARRIVAL` to avoid materializing entire files in memory. This yields batches as they are produced by PyArrow, one file at a time:

```python
from pyiceberg.table import ScanOrder

table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL)
```

### Pandas

<!-- prettier-ignore-start -->
Expand Down
30 changes: 27 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ScanOrder, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
Expand Down Expand Up @@ -1761,7 +1761,12 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:

return result

def to_record_batches(self, tasks: Iterable[FileScanTask], batch_size: int | None = None) -> Iterator[pa.RecordBatch]:
def to_record_batches(
self,
tasks: Iterable[FileScanTask],
batch_size: int | None = None,
order: ScanOrder = ScanOrder.TASK,
) -> Iterator[pa.RecordBatch]:
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].

Returns an Iterator of pa.RecordBatch with data from the Iceberg table
Expand All @@ -1770,17 +1775,36 @@ def to_record_batches(self, tasks: Iterable[FileScanTask], batch_size: int | Non

Args:
tasks: FileScanTasks representing the data files and delete files to read from.
batch_size: The number of rows per batch. If None, PyArrow's default is used.
order: Controls the order in which record batches are returned.
ScanOrder.TASK (default) returns batches in task order, with each task
fully materialized before proceeding to the next. Allows parallel file
reads via executor. ScanOrder.ARRIVAL yields batches as they are
produced, processing tasks sequentially without materializing entire
files into memory.

Returns:
An Iterator of PyArrow RecordBatches.
Total number of rows will be capped if specified.

Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
ValueError: When a field type in the file cannot be projected to the schema type,
or when an invalid order value is provided.
"""
if not isinstance(order, ScanOrder):
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder enum value (ScanOrder.TASK or ScanOrder.ARRIVAL).")

deletes_per_file = _read_all_delete_files(self._io, tasks)

if order == ScanOrder.ARRIVAL:
# Arrival order: process all tasks sequentially, yielding batches as produced.
# _record_batches_from_scan_tasks_and_deletes handles the limit internally
# when called with all tasks, so no outer limit check is needed.
yield from self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file, batch_size)
return

# Task order: existing behavior with executor.map + list()
total_row_count = 0
executor = ExecutorFactory.get_or_create()

Expand Down
23 changes: 21 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Iterator
from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from itertools import chain
from types import TracebackType
Expand Down Expand Up @@ -154,6 +155,20 @@
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"


class ScanOrder(str, Enum):
"""Order in which record batches are returned from a scan.

Attributes:
TASK: Batches are returned in task order, with each task fully materialized
before proceeding to the next. Allows parallel file reads via executor.
ARRIVAL: Batches are yielded as they are produced, processing tasks
sequentially without materializing entire files into memory.
"""

TASK = "task"
ARRIVAL = "arrival"


@dataclass()
class UpsertResult:
"""Summary the upsert operation."""
Expand Down Expand Up @@ -2155,7 +2170,7 @@ def to_arrow(self) -> pa.Table:
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatchReader:
def to_arrow_batch_reader(self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than
Expand All @@ -2164,6 +2179,10 @@ def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatch

Args:
batch_size: The number of rows per batch. If None, PyArrow's default is used.
order: Controls the order in which record batches are returned.
ScanOrder.TASK (default) returns batches in task order with parallel
file reads. ScanOrder.ARRIVAL yields batches as they are produced,
processing tasks sequentially.

Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
Expand All @@ -2176,7 +2195,7 @@ def to_arrow_batch_reader(self, batch_size: int | None = None) -> pa.RecordBatch
target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files(), batch_size=batch_size)
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order)

return pa.RecordBatchReader.from_batches(
target_schema,
Expand Down
172 changes: 171 additions & 1 deletion tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema, make_compatible_name, visit
from pyiceberg.table import FileScanTask, TableProperties
from pyiceberg.table import FileScanTask, ScanOrder, TableProperties
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.table.name_mapping import create_mapping_from_schema
from pyiceberg.transforms import HourTransform, IdentityTransform
Expand Down Expand Up @@ -3106,6 +3106,176 @@ def test_task_to_record_batches_default_batch_size(tmpdir: str) -> None:
assert len(batches[0]) == num_rows


def _create_scan_and_tasks(
tmpdir: str,
num_files: int = 1,
rows_per_file: int = 100,
limit: int | None = None,
delete_rows_per_file: list[list[int]] | None = None,
) -> tuple[ArrowScan, list[FileScanTask]]:
"""Helper to create an ArrowScan and FileScanTasks for testing.

Args:
delete_rows_per_file: If provided, a list of lists of row positions to delete
per file. Length must match num_files. Each inner list contains 0-based
row positions within that file to mark as positionally deleted.
"""
table_schema = Schema(NestedField(1, "col", LongType(), required=True))
pa_schema = pa.schema([pa.field("col", pa.int64(), nullable=False, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"})])
tasks = []
for i in range(num_files):
start = i * rows_per_file
arrow_table = pa.table({"col": pa.array(range(start, start + rows_per_file))}, schema=pa_schema)
data_file = _write_table_to_data_file(f"{tmpdir}/file_{i}.parquet", pa_schema, arrow_table)
data_file.spec_id = 0

delete_files = set()
if delete_rows_per_file and delete_rows_per_file[i]:
delete_table = pa.table(
{
"file_path": [data_file.file_path] * len(delete_rows_per_file[i]),
"pos": delete_rows_per_file[i],
}
)
delete_path = f"{tmpdir}/deletes_{i}.parquet"
pq.write_table(delete_table, delete_path)
delete_files.add(
DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path=delete_path,
file_format=FileFormat.PARQUET,
partition={},
record_count=len(delete_rows_per_file[i]),
file_size_in_bytes=22,
)
)

tasks.append(FileScanTask(data_file=data_file, delete_files=delete_files))

scan = ArrowScan(
table_metadata=TableMetadataV2(
location="file://a/b/",
last_column_id=1,
format_version=2,
schemas=[table_schema],
partition_specs=[PartitionSpec()],
),
io=PyArrowFileIO(),
projected_schema=table_schema,
row_filter=AlwaysTrue(),
case_sensitive=True,
limit=limit,
)
return scan, tasks


def test_task_order_produces_same_results(tmpdir: str) -> None:
"""Test that order=ScanOrder.TASK produces the same results as the default behavior."""
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)

batches_default = list(scan.to_record_batches(tasks, order=ScanOrder.TASK))
# Re-create tasks since iterators are consumed
_, tasks2 = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)
batches_task_order = list(scan.to_record_batches(tasks2, order=ScanOrder.TASK))

total_default = sum(len(b) for b in batches_default)
total_task_order = sum(len(b) for b in batches_task_order)
assert total_default == 300
assert total_task_order == 300


def test_arrival_order_yields_all_batches(tmpdir: str) -> None:
"""Test that order=ScanOrder.ARRIVAL yields all batches correctly."""
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)

batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))

total_rows = sum(len(b) for b in batches)
assert total_rows == 300
# Verify all values are present
all_values = sorted([v for b in batches for v in b.column("col").to_pylist()])
assert all_values == list(range(300))


def test_arrival_order_with_limit(tmpdir: str) -> None:
"""Test that order=ScanOrder.ARRIVAL respects the row limit."""
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100, limit=150)

batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))

total_rows = sum(len(b) for b in batches)
assert total_rows == 150


def test_arrival_order_file_ordering_preserved(tmpdir: str) -> None:
"""Test that file ordering is preserved in arrival order mode."""
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=3, rows_per_file=100)

batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))
all_values = [v for b in batches for v in b.column("col").to_pylist()]

# Values should be in file order: 0-99 from file 0, 100-199 from file 1, 200-299 from file 2
assert all_values == list(range(300))


def test_arrival_order_with_positional_deletes(tmpdir: str) -> None:
"""Test that order=ScanOrder.ARRIVAL correctly applies positional deletes."""
# 3 files, 10 rows each; delete rows 0,5 from file 0, row 3 from file 1, nothing from file 2
scan, tasks = _create_scan_and_tasks(
tmpdir,
num_files=3,
rows_per_file=10,
delete_rows_per_file=[[0, 5], [3], []],
)

batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))

total_rows = sum(len(b) for b in batches)
assert total_rows == 27 # 30 - 3 deletes
all_values = sorted([v for b in batches for v in b.column("col").to_pylist()])
# File 0: 0-9, delete rows 0,5 → values 1,2,3,4,6,7,8,9
# File 1: 10-19, delete row 3 → values 10,11,12,14,15,16,17,18,19
# File 2: 20-29, no deletes → values 20-29
expected = [1, 2, 3, 4, 6, 7, 8, 9] + [10, 11, 12, 14, 15, 16, 17, 18, 19] + list(range(20, 30))
assert all_values == sorted(expected)


def test_arrival_order_with_positional_deletes_and_limit(tmpdir: str) -> None:
"""Test that order=ScanOrder.ARRIVAL with positional deletes respects the row limit."""
# 3 files, 10 rows each; delete row 0 from each file
scan, tasks = _create_scan_and_tasks(
tmpdir,
num_files=3,
rows_per_file=10,
limit=15,
delete_rows_per_file=[[0], [0], [0]],
)

batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL))

total_rows = sum(len(b) for b in batches)
assert total_rows == 15


def test_task_order_with_positional_deletes(tmpdir: str) -> None:
"""Test that the default task order mode correctly applies positional deletes."""
# 3 files, 10 rows each; delete rows from each file
scan, tasks = _create_scan_and_tasks(
tmpdir,
num_files=3,
rows_per_file=10,
delete_rows_per_file=[[0, 5], [3], []],
)

batches = list(scan.to_record_batches(tasks, order=ScanOrder.TASK))

total_rows = sum(len(b) for b in batches)
assert total_rows == 27 # 30 - 3 deletes
all_values = sorted([v for b in batches for v in b.column("col").to_pylist()])
expected = [1, 2, 3, 4, 6, 7, 8, 9] + [10, 11, 12, 14, 15, 16, 17, 18, 19] + list(range(20, 30))
assert all_values == sorted(expected)


def test_parse_location_defaults() -> None:
"""Test that parse_location uses defaults."""

Expand Down