Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
refactor: replace streaming param with order=ScanOrder in concurrent …
…tests and docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
  • Loading branch information
sumedhsakdeo and claude committed Feb 17, 2026
commit 7c415d4fec40d07a86c8e823dcf56195c71dec9e
16 changes: 10 additions & 6 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,19 +371,21 @@ for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, batch_size=
print(f"Buffer contains {len(buf)} rows")
```

For maximum throughput, use `concurrent_files` to read multiple files in parallel while streaming. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
For maximum throughput, use `concurrent_files` to read multiple files in parallel with arrival order. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:

```python
for buf in tbl.scan().to_arrow_batch_reader(streaming=True, concurrent_files=4, batch_size=1000):
from pyiceberg.table import ScanOrder

for buf in tbl.scan().to_arrow_batch_reader(order=ScanOrder.ARRIVAL, concurrent_files=4, batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

**Ordering semantics:**

| Configuration | File ordering | Within-file ordering |
|---|---|---|
| Default (`streaming=False`) | Batches grouped by file, in task submission order | Row order |
| `streaming=True` | Interleaved across files (no grouping guarantee) | Row order within each file |
| `ScanOrder.TASK` (default) | Batches grouped by file, in task submission order | Row order |
| `ScanOrder.ARRIVAL` | Interleaved across files (no grouping guarantee) | Row order within each file |

Within each file, batch ordering always follows row order. The `limit` parameter is enforced correctly regardless of configuration.

Expand Down Expand Up @@ -1671,13 +1673,15 @@ table.scan(
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL)
```

For concurrent file reads with streaming, use `concurrent_files`. Note that batch ordering across files is not guaranteed:
For concurrent file reads with arrival order, use `concurrent_files`. Note that batch ordering across files is not guaranteed:

```python
from pyiceberg.table import ScanOrder

table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(streaming=True, concurrent_files=4)
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL, concurrent_files=4)
```

When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.
Expand Down
8 changes: 4 additions & 4 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3277,7 +3277,7 @@ def test_task_order_with_positional_deletes(tmpdir: str) -> None:


def test_concurrent_files_with_positional_deletes(tmpdir: str) -> None:
"""Test that streaming=True with concurrent_files correctly applies positional deletes."""
"""Test that order=ScanOrder.ARRIVAL with concurrent_files correctly applies positional deletes."""
# 4 files, 10 rows each; delete different rows per file
scan, tasks = _create_scan_and_tasks(
tmpdir,
Expand All @@ -3286,7 +3286,7 @@ def test_concurrent_files_with_positional_deletes(tmpdir: str) -> None:
delete_rows_per_file=[[0, 9], [4, 5], [0, 1, 2], []],
)

batches = list(scan.to_record_batches(tasks, streaming=True, concurrent_files=2))
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL, concurrent_files=2))

total_rows = sum(len(b) for b in batches)
assert total_rows == 33 # 40 - 7 deletes
Expand All @@ -3310,7 +3310,7 @@ def test_concurrent_files_with_positional_deletes_and_limit(tmpdir: str) -> None
delete_rows_per_file=[[0], [0], [0], [0]],
)

batches = list(scan.to_record_batches(tasks, streaming=True, concurrent_files=2))
batches = list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL, concurrent_files=2))

total_rows = sum(len(b) for b in batches)
assert total_rows == 20
Expand All @@ -3321,7 +3321,7 @@ def test_concurrent_files_invalid_value(tmpdir: str) -> None:
scan, tasks = _create_scan_and_tasks(tmpdir, num_files=1, rows_per_file=10)

with pytest.raises(ValueError, match="concurrent_files must be >= 1"):
list(scan.to_record_batches(tasks, streaming=True, concurrent_files=0))
list(scan.to_record_batches(tasks, order=ScanOrder.ARRIVAL, concurrent_files=0))


def test_parse_location_defaults() -> None:
Expand Down