feat: Plumb Parquet virtual columns (row_number) through TableSchema and ParquetOpener#22026
feat: Plumb Parquet virtual columns (row_number) through TableSchema and ParquetOpener#22026mbutrovich wants to merge 17 commits into
Conversation
…and ParquetOpener, gated behind a tested-only extension-type allowlist, to unblock Comet's native-DataFusion support for Spark's _tmp_metadata_row_index.
|
My main concern is #22026 (comment). The various schemas in |
Thanks for the review @adriangb! Agreed it could make things more complicated, but if DataFusion is ever going to support these virtual columns it might be unavoidable. I think it's good to hash this stuff out in the smallest possible PR at the opener level. I'll push an update later today. |
|
Thanks again for the review @adriangb! Hopefully I addressed all of the feedback, but happy to keep chatting about it. Mixed virtual/file predicates with Confirmed the silent-drop bug with failing tests. Root cause: Arrow-rs can't accept virtual-column refs in a Fix: added Defense-in-depth in the opener for callers who bypass the optimizer (e.g. manual plan builders): Tests: Ordering doc on Struct field doc now spells out the Enum + Added |
|
I think this would then have a negative interaction with the goal of turning filter pushdown on by default. Maybe we'll always have to apply some filters as a |
Comet conservatively never removes Wouldn't this only prevent filter pushdown for filters that reference virtual columns? |
Yeah but it means we'll have to keep the split forever. Which might have been the case anyway and maybe a non issue. And that any filter that does reference virtual columns cannot be pushed down even if a part of it would benefit from doing so, e..g |
|
I plan to give this another review tomorrow. |
|
run benchmark tpch tpcds |
|
@mbutrovich from high level perspective how |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing virtual-columns-table-schema (bd513ec) to 2c7af17 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing virtual-columns-table-schema (bd513ec) to 2c7af17 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
…to virtual-columns-table-schema
adriangb
left a comment
There was a problem hiding this comment.
I think this looks good. @mbutrovich do you intend to get this into 54? I think there's something to be said for waiting until 54 goes out at this point so we can do the rest of the work wiring up so we can derisk the design as a whole.
I'd like it in 54 if we think the API at this layer is stable, but I see your argument that if the API needs a tweak when we go to hook everything up that we hit API stability challenges. I am okay to defer, but also was not planning to do the work to hook it up to the front-end any time soon, so it becomes an indefinite merge/maybe not completely wired in 55 either. |
|
Gotcha. If you're okay deferring until 54 (which should just be a week or two) I think that'd make me feel more comfortable taking the risk. We don't have feature freezes officially but I think it's a good general approach to take. I asked in #20135 (comment) if anyone can drive the rest of this but I'd say once 54 is out we can merge this regardless. Thanks for working on this it's been quite the effort! |
|
No worries. This isn't urgently needed in Comet, it's just on the list of Spark gaps we want to close. Thanks for your help thus far! |
| if self.virtual_columns.is_empty() { | ||
| self.virtual_columns = Arc::new(virtual_columns); | ||
| } else { | ||
| let existing = Arc::get_mut(&mut self.virtual_columns).expect( |
There was a problem hiding this comment.
I think this will panic if:
- You make a
TableSchema - Call
with_virtual_columnsto add some virtual columns - clone the
TableSchema - Call
with_virtual_columnsagain on one of the owned clones
Now with_table_partition_cols has the same bug so maybe it's okay, but I do think it's an unsafe contract. It also seems like the solution is relatively simple: use Arc::make_mut. I might make a PR for
with_table_partition_cols
There was a problem hiding this comment.
Opened #22372 to fix with_table_partition_cols with Arc::make_mut (off main, so it can land independently of this PR).
|
@mbutrovich I think we are ready to merge this. Sorry for the conflicts, we've been doing some cleanup / refactoring in |
I'll get it cleaned up by end of week, thanks for the reminder! |
…F + RowNumber
Reworks the Load sink path in three coupled directions:
1. **Streaming `LoadExec` -- unordered concurrent merger.** Replace the
serial "open one file, drain it, open next" walker with a stream
combinator chain: per-row `extract_row_inputs` -> per-row open future
-> `futures::stream::buffer_unordered(N)` -> `try_flatten`. Concurrency
cap N = `target_partitions` clamped [1, 64] (default 8). Output
ordering across files is unspecified by design; intra-file batch
order is preserved by sequential drain. Limit applied at the
flattened output by slicing + early termination. Cancellation
propagates via the standard stream-drop chain.
2. **Compile-time eager dispatch in `register_load_relation`.** When the
compiled upstream `LogicalPlan` is bare `LogicalPlan::Values` and the
sink has no DV, register a new `EagerLoadTableProvider` (custom
`TableProvider`) instead of `LoadTableProvider`. The provider holds a
pre-built `Vec<PartitionedFile>` with per-file `partition_values`
populated from the row's passthrough literals -- same broadcast
mechanism the streaming path uses. `scan()` returns a single
`DataSourceExec` over a `FileGroup`, so DataFusion's native
multi-partition fan-out, projection / limit pushdown, and
`repartitioned()` apply for free. Anything else (non-Values upstream
or DV present) falls through to the streaming path.
3. **DV applied via a `not_in_dv` ScalarUDF over the parquet `_row_number`
virtual column.** Each per-file open future:
- Resolves the kernel-side DV via
`tokio::task::spawn_blocking(|| descriptor.read(...))` -- returns a
`RoaringTreemap` (deleted row IDs) cheaply Arc-cloned into the
UDF's closure.
- Builds a per-file `DataSourceExec -> FilterExec(not_in_dv(_row_number))
-> ProjectionExec(drop _row_number)` stack via
`build_per_file_plan`. The opener's `with_virtual_columns` (via
`TableSchema::with_virtual_columns` from
apache/datafusion#22026) injects `_row_number` into emitted batches
so the FilterExec predicate can reference it; the trailing
ProjectionExec drops it before the output stream sees it.
- Reject `LoadSink` with `FileType::Json` AND `dv_ref.is_some()` at
`LoadExec::new`: JSON has no row-number virtual column, and DVs
only apply to Delta parquet data files in practice.
Critical fix in `FieldIdPhysicalExprAdapter::rewrite_column`: virtual
columns are now reindexed to their position in the **physical** file
schema (`physical_file_schema.fields().len()` for a single virtual)
rather than passing the original index unchanged. The original index
came from the LoadExec's projected `TableSchema` (= file ++ partition
++ virtual), which doesn't match either `logical_for_rewrite` (file ++
virtual, no partition) or `physical_for_rewrite` (physical_file ++
virtual). Schema evolution can also make logical and physical schemas
diverge in length; reindexing against the physical schema's actual
length lands the virtual at the correct position regardless.
Other touches:
- **Phase 3 (sync I/O drop)**: `file_size_for_row` no longer falls back
to `std::fs::metadata` when the size column is unset/null. Sizes get
resolved per-file via async object-store HEAD inside
`build_per_file_plan` (`resolve_size_if_unknown`), avoiding a sync
call inside an async future.
- **Phase 4 (projection-aware passthrough)**: precompute
`projected_passthrough: Arc<Vec<usize>>` once at `LoadExec::new`;
iterate that in `extract_row_inputs` instead of all of
`sink.passthrough_columns`.
- **Phase 6 (factor helpers)**: shared primitives -- `RowInputs`,
`extract_row_inputs`, `build_file_source`, `into_partitioned_file`,
`adapter_factory_for`, `strip_field_metadata_recursive`,
`make_not_in_dv_udf`, `resolve_dv_async`, `resolve_size_if_unknown`,
`build_per_file_plan` -- live in `load_helpers.rs` and feed both the
streaming `LoadExec` and the eager `EagerLoadTableProvider`.
Workspace patches: point `datafusion-*` at our local datafusion-fork
on the `pr-22026` branch (open + approved by adriangb,
2026-05-14), and `datafusion-functions-json` at a local fork carrying
the post-50.0 API-drift fix (drop redundant `as_any` impls, rename
`Cast.data_type` -> `Cast.field`). Adds `roaring = "0.11.2"` and
`tokio` direct deps.
Tests: scan_correctness gains a `scan_with_row_index_and_utf8_column`
repro that locks in the virtual-column + Utf8 + supplied_schema
interaction (was a latent regression triggered only by acceptance
fixtures). Acceptance: 3457 / 3457 pass (was 71 failures before this
work). Three previously-expected-fail entries
(`cdc_schema_evolution_read_all`,
`cdf_with_schema_evolution_read_all`,
`cm_id_matching_swapped_select_a_reads_e`) now pass via the eager /
field-id-aware path and move into `FIXED_IN_DATAFUSION`.
…Arc (apache#22372) ## Which issue does this PR close? - No separate issue. This addresses a review observation from apache#22026: apache#22026 (comment) ## Rationale for this change `TableSchema::with_table_partition_cols` appended to an existing partition-column list via `Arc::get_mut(...).expect(...)`. The `expect` message assumed that owning `self` implies sole ownership of the inner `Arc<Vec<FieldRef>>` — but that is not true. `TableSchema` derives `Clone`, and cloning only bumps the `Arc` refcount without copying the `Vec`. So this sequence panicked: ```rust let ts = TableSchema::new(file_schema, vec![some_partition_col]); let cloned = ts.clone(); // Arc refcount is now 2 let _ = cloned.with_table_partition_cols(more); // Arc::get_mut -> None -> expect() panics ``` `with_table_partition_cols` taking `mut self` gives unique ownership of the *struct*, not of the inner `Arc`. ## What changes are included in this PR? - Make `with_table_partition_cols` **replace** the partition columns instead of appending to them, by assigning a fresh `Arc::new(partition_cols)`. This removes the in-place mutation branch entirely: - It never mutates the inner `Vec`, so it is safe even when the `Arc` is shared with a clone (copy-on-write isolation is automatic) — fixing the panic without needing `Arc::make_mut`. - It matches builder-API expectations (a `with_x` setter replaces) and removes the risk of accidentally duplicating partition columns, as raised in review. - No production code relied on the append behavior (every `TableSchema` is built via `new`/`from_file_schema`); only unit tests exercised it, and they are updated to assert replacement. ## Are these changes tested? Yes: - `test_with_table_partition_cols_replaces_existing` verifies that calling the method on a `TableSchema` that already has partition columns replaces them rather than appending. - `test_with_table_partition_cols_after_clone_does_not_panic` clones a `TableSchema` and sets partition columns on the clone, verifying it does not panic and that the other clone is left unmodified (copy-on-write isolation). Existing `TableSchema` tests continue to pass. ## Are there any user-facing changes? `TableSchema::with_table_partition_cols` now replaces existing partition columns instead of appending to them. The previous append path panicked on any shared/cloned `TableSchema`, so no working usage relied on it. There are no API signature changes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Daniël Heres <danielheres@gmail.com>
|
I was excited to try this feature out -- but I couldn't figure out how to query the virtual columns from SQL -- is that possible? Or does this sentence mean you plan to do it as a follow on PR
I personally prefer having APIs in the code that we are sure can work / are documented with examples so we know they are adequate to actually support the usecase I wonder if we should try and hook up the virtual columns in SQL as a draft PR to be sure this API is good enough 🤔 |
|
Actually, it seems like @AdamGS plans to make such a PR here: #20135 (comment) |
|
Yes the plan was to wire it up in a followup, ref discussion following #20135 (comment). @AdamGS is working on it. TLDR: it may not be trivial to hook up and get all cases working, but this low level implementation seems like it would likely work for any eventual front end implementation and it unblocks @mbutrovich |
…pache#22496) ## Which issue does this PR close? - No separate issue. Follows up on apache#22372 (panic fix in `TableSchema::with_table_partition_cols`) and the API discussion it spawned, and is informed by apache#22026 (which adds a third column group, virtual columns, to `TableSchema`). ## Rationale for this change `TableSchema` has one required input (the file schema) and a growing set of *optional* column groups: partition columns today, virtual columns in apache#22026. The current API expresses this awkwardly: - `new(file_schema, partition_cols)` privileges partition columns with a positional slot while virtual columns only get a builder method — an asymmetry that grows with every new column kind. - `TableSchema` eagerly recomputes and caches the concatenated table schema on *every* incremental setter call, so `from_file_schema(s).with_table_partition_cols(p)` rebuilds it twice (three times once virtual columns are added). This is exactly why `new()`'s docs told callers to avoid the builder-style chain. - The setter mutated an inner `Arc<Vec<FieldRef>>` in place, which is what caused the shared-`Arc` panic fixed in apache#22372. A dedicated builder addresses all three, and mirrors the existing `FileScanConfigBuilder` (the type that *owns* a `TableSchema`). ## What changes are included in this PR? - **`TableSchemaBuilder`**: `new(file_schema)` → `.with_table_partition_cols(impl Into<Fields>)` → `.build()`. The concatenated table schema is computed exactly **once**, in `build()`. The setter takes `impl Into<Fields>`, so an existing schema's `Fields` is accepted zero-copy. - **Partition columns are now stored as `arrow::datatypes::Fields`** (an immutable `Arc<[FieldRef]>`) instead of `Arc<Vec<FieldRef>>`: one fewer indirection, shareable zero-copy, and — being immutable — the shared-`Arc` mutation panic is structurally impossible. - **`TableSchema::table_partition_cols()` and the delegating `FileScanConfig::table_partition_cols()` now return `&Fields`.** `Fields` derefs to `&[FieldRef]`, so iteration/indexing/`len`/`is_empty` are unchanged; only the arrow `FileFormat` path needed `.to_vec()`. - **`TableSchema::with_table_partition_cols` is deprecated** in favor of the builder. It now **replaces** rather than appends. (Note: `main` currently *appends* here — the replace change in apache#22372 was not captured by that PR's squash merge — so this also restores the intended replace semantics.) - `new` / `from_file_schema` are kept as conveniences that route through the builder. - Documented in the 54.0.0 upgrade guide. This intentionally leaves virtual columns out; apache#22026 should extend the builder with `with_virtual_columns` once it lands. ## Are these changes tested? Yes. New unit tests cover building with partition columns, replace-on-repeat, zero-copy `Fields` input, and the deprecated setter's behavior; existing `TableSchema` / `FileScanConfig` tests and doctests pass. `cargo clippy --all-targets -- -D warnings` is clean across the datasource/proto/arrow/parquet/catalog-listing crates. ## Are there any user-facing changes? Yes — please apply the `api change` label: - `TableSchema::table_partition_cols()` / `FileScanConfig::table_partition_cols()` return `&Fields` instead of `&Vec<FieldRef>` (source-compatible for most uses via `Deref`). - `TableSchema::with_table_partition_cols` is deprecated (use the builder) and now replaces rather than appends. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
_tmp_metadata_row_index).Rationale for this change
arrow-rs 57.1.0+ supports Parquet virtual columns (
row_number,row_group_index) viaArrowReaderOptions::with_virtual_columns, and DataFusion pins a new-enough arrow-rs for the API to be available. DataFusion does not yet plumb the option throughParquetOpener, so consumers (notably Comet) cannot project Spark's_tmp_metadata_row_indexthrough the native_datafusion scan path.This PR adds the minimal opener-boundary plumbing so
TableSchemacan carry virtual columns and the Parquet reader produces them. UX / SQL-layer surface for virtual columns stays deferred to the epic in #20135 — this follows the same framing alamb blessed for #20071 (theinput_file_name()UDF).What changes are included in this PR?
TableSchema::with_virtual_columns(...)builder +virtual_columns()getter. Layout:[file, partition, virtual]. Composable withwith_table_partition_colsin either order.TableSchema::schema_without_virtual_columns()— file + partition schema used by pushdown-planning paths that can't evaluate virtual-col refs.ParquetOpenerforwards the fields toArrowReaderOptions::with_virtual_columns; augments the schemas passed to the expr-adapter / simplifier with virtual fields so virtual-col refs identity-rewrite; strips them from the projection fed toProjectionMask::roots(which only understands file columns) and appends them tostream_schemasoreassign_expr_columnsresolves them by name.ParquetVirtualColumnenum withTryFrom<&FieldRef>(indatasource-parquet::virtual_column) gates which arrow-rs virtual extension types are accepted. Currently onlyRowNumber; adding a variant (e.g.RowGroupIndex) is a compile-time obligation. Replaces the earlier runtime string-allowlist so the contract lives in the type system.ParquetSource::try_pushdown_filtersclassifies filters against the file+partition schema (not the full table schema) so predicates referencing virtual columns are reported asPushedDown::Noand theFilterExecstays above the scan — arrow-rs'sRowFilteraddresses parquet leaves only and can't evaluate virtual-column refs, so silently pushing them would produce wrong results.build_virtual_columns_state(run once per scan partition at morselizer-build time) errors whenpushdown_filters=trueand the predicate references a virtual column, with a clear remediation message pointing attry_pushdown_filters. This catches callers that bypass the optimizer and set the predicate onParquetSourcedirectly.arrow-schemaadded as a direct dep (previously transitive viaarrow) so the enum referencesRowNumber::NAMEfrom arrow-rs instead of hardcoding the string.ListingTable/ SQL-layer surface, a three-arg constructor onTableSchema,ParquetSource::with_virtual_columns, andRowGroupIndexsupport.Are these changes tested?
Yes. New unit tests in
opener.rs:test_row_index_basic— single row group, select data + row_number.test_row_index_projection_only— select only row_number.test_row_index_multi_row_group— 3 × 100 rows, verify absolute 0..300 across boundaries.test_row_index_with_row_group_skip— predicate stats-prunes the middle row group; verify row numbers stay absolute (0..100 ++ 200..300). Critical correctness gate for Spark (and for FixRowNumberReaderwhen not all row groups are selected arrow-rs#8863).test_row_index_with_partition_cols— partition + virtual + data columns compose correctly.test_row_index_nullable_int64— nullability flag flows through unchanged (matches Spark's_tmp_metadata_row_indexdeclaration).test_unsupported_virtual_extension_type_rejected— usingRowGroupIndex(a real arrow-rs type deliberately not in the enum yet) errors withNotImplementedinstead of silently forwarding.test_row_index_predicate_pushdown_mixed_or_errors/_virtual_only_errors/_allowed_when_pushdown_disabled— exercise the opener's defensive check for virtual-col predicate refs withpushdown_filters=true, and confirm thepushdown_filters=falsepath is unaffected.In
source.rs:test_try_pushdown_filters_rejects_virtual_column_refspins the planner-boundary contract — file-col filters arePushedDown::Yes, virtual-only and mixed filters arePushedDown::No.In
virtual_column.rs: unit tests coveringTryFrom<&FieldRef>for valid, missing-extension-type, and unsupported-extension-type inputs.Plus a
TableSchemaunit test verifying the[file, partition, virtual]layout is stable regardless of builder-call order.Are there any user-facing changes?
Public API additions:
TableSchema::with_virtual_columns(...),TableSchema::virtual_columns(),TableSchema::schema_without_virtual_columns(), andParquetVirtualColumn(re-exported fromdatafusion-datasource-parquet). No existing API changed; no breaking changes.