Skip to content

feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads#44

Merged
robreeves merged 1 commit intolinkedin:li-0.11from
robreeves:arrow_scan
Apr 3, 2026
Merged

feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads#44
robreeves merged 1 commit intolinkedin:li-0.11from
robreeves:arrow_scan

Conversation

@robreeves
Copy link
Copy Markdown
Collaborator

Backport of apache/iceberg-python#3046

Summary

Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables.

This PR adds a new order parameter to to_arrow_batch_reader() with two implementations:

  • TaskOrder (default) — preserves existing behavior: batches grouped by file in task submission order, each file fully materialized before proceeding to the next.
  • ArrivalOrder — yields batches as they are produced across files without materializing entire files into memory. Accepts three sub-parameters:
    • concurrent_streams: int — number of files to read concurrently (default: 8). A per-scan ThreadPoolExecutor(max_workers=concurrent_streams) bounds concurrency.
    • batch_size: int | None — number of rows per batch passed to PyArrow's ds.Scanner (default: PyArrow's built-in 131,072).
    • max_buffered_batches: int — size of the bounded queue between producers and consumer (default: 16), providing backpressure to cap memory usage.

Problem

The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer.

Solution

Before: OOM on large tables

batches = table.scan().to_arrow_batch_reader()

After: bounded memory, tunable parallelism

from pyiceberg.table import ArrivalOrder

batches = table.scan().to_arrow_batch_reader(
    order=ArrivalOrder(concurrent_streams=4, batch_size=10000),
)

Default behavior is unchanged — TaskOrder preserves the existing executor.map + list() path for backwards compatibility.

Architecture

When order=ArrivalOrder(...), batches flow through _bounded_concurrent_batches:

  1. All file tasks are submitted to a per-scan ThreadPoolExecutor(max_workers=concurrent_streams)
  2. Workers push batches into a bounded Queue(maxsize=max_buffered_batches) — when full, workers block (backpressure)
  3. The consumer yields batches from the queue via blocking queue.get()
  4. A sentinel value signals completion — no timeout-based polling
  5. On early termination (consumer stops), a cancel event is set and the queue is drained until the sentinel to unblock all stuck workers
  6. The executor context manager handles deterministic shutdown

Refactored to_record_batches into helpers: _prepare_tasks_and_deletes, _iter_batches_arrival, _iter_batches_materialized, _apply_limit.

Ordering semantics

Configuration File ordering Within-file ordering
TaskOrder() (default) Batches grouped by file, in task submission order Row order
ArrivalOrder(concurrent_streams=1) Grouped by file, sequential Row order
ArrivalOrder(concurrent_streams>1) Interleaved (no grouping guarantee) Row order within each file

Benchmark results

32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default):

Config Throughput (rows/s) TTFR (ms) Peak Arrow Memory
default (TaskOrder) 190,250,192 73.4 642.2 MB
ArrivalOrder(cs=1) 59,317,085 27.7 10.3 MB
ArrivalOrder(cs=2) 105,414,909 28.8 42.0 MB
ArrivalOrder(cs=4) 175,840,782 28.4 105.5 MB
ArrivalOrder(cs=8) 211,922,538 32.3 271.7 MB
ArrivalOrder(cs=16) 209,011,424 45.0 473.3 MB

TTFR = Time to First Record, cs = concurrent_streams

Are these changes tested?

Yes. 25 new unit tests across two test files, plus a micro-benchmark.

Are there any user-facing changes?

Yes. New order parameter on DataScan.to_arrow_batch_reader():

  • order: ScanOrder | None — controls batch ordering. Pass TaskOrder() (default) or ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M).

New public classes TaskOrder and ArrivalOrder (subclasses of ScanOrder) exported from pyiceberg.table.

All parameters are optional with backwards-compatible defaults. Existing code is unaffected.

Documentation updated in mkdocs/docs/api.md with usage examples, ordering semantics, and configuration guidance table.

Backport of apache/iceberg-python#3046. Adds a new `order` parameter to
`to_arrow_batch_reader()` with TaskOrder (default) and ArrivalOrder
implementations to support bounded-memory concurrent reads.
@robreeves robreeves marked this pull request as ready for review April 3, 2026 05:44
@robreeves robreeves merged commit d61f46f into linkedin:li-0.11 Apr 3, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants