Skip to content

[SPARK-57205][SQL] Add SupportsScanMerging to merge equivalent V2 file scans#56264

Open
LuciferYang wants to merge 2 commits into
apache:masterfrom
LuciferYang:supportsmerge-v2-scans
Open

[SPARK-57205][SQL] Add SupportsScanMerging to merge equivalent V2 file scans#56264
LuciferYang wants to merge 2 commits into
apache:masterfrom
LuciferYang:supportsmerge-v2-scans

Conversation

@LuciferYang

@LuciferYang LuciferYang commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This adds a SupportsScanMerging mix-in for DSv2 Scan and wires it into MergeSubplans through PlanMerger. When two scans of the same table differ only in their projected columns and/or pushed filters, the optimizer asks the source to fuse them into a single scan that covers both, then deduplicates the subplans built on top. The interface is implemented once on the FileScan base, so every built-in file format (Parquet, ORC, CSV, JSON, Text, Avro) participates. Two scans merge when they read the same data (same file index, schema, options and partition filters), and the merged scan reads the union of their read schemas. When the pushed data filters differ, the merged scan widens them to OR(f1, f2) and reads a superset; the exact per-side predicate is still enforced by the post-scan Filter, which MergeSubplans rewrites into per-aggregate FILTER (WHERE ...) clauses. That widening is off by default and gated by spark.sql.files.scanMerge.ignorePushedDataFilters. Merging is declined when an aggregate is pushed into the scan, because there is then no post-scan Filter left to separate the two sides.

Why are the changes needed?

V1 file sources already merge in this case. Their filter and column pushdown happens during physical planning, so when MergeSubplans runs the logical plan still has plain Filter/Project nodes over identical relation leaves, which the existing rules combine. DSv2 instead bakes pushdown into DataSourceV2ScanRelation during logical optimization, so two subqueries that differ only in WHERE or SELECT become structurally different leaves and cannot be merged. TPC-DS q9 is a good example: its fifteen scalar subqueries over store_sales collapse to a single scan under V1 but read the table fifteen times under V2. This change brings V2 to parity.

Does this PR introduce any user-facing change?

No. The connector interface and the config are both internal, and the config defaults to off, so the default plan is unchanged.

How was this patch tested?

New tests in PlanMergeSuite run each query through both the V1 and V2 file-source paths, with AQE on and off, and assert that the results and the merge structure are identical. They cover differing columns, differing data filters, differing partition filters, filter propagation through a join, and a multi-subquery composition, plus the negative cases, and they run a representative query across Parquet, ORC and JSON for format coverage. MergeSubplansSuite, DataSourceV2Suite, and the Parquet/ORC V2 aggregate-pushdown suites still pass. I also verified that V1 and V2 produce identical results and merge structure on the affected TPC-DS queries (q2, q9, q28, q59, q77a, q88, q90) at scale factor 1.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.8

@LuciferYang LuciferYang marked this pull request as draft June 2, 2026 07:59
@LuciferYang LuciferYang force-pushed the supportsmerge-v2-scans branch 2 times, most recently from fbaa5f0 to 7c1e41a Compare June 2, 2026 12:30
…e scans

### What changes were proposed in this pull request?

This adds a `SupportsScanMerging` mix-in for DSv2 `Scan` and teaches
`MergeSubplans` (through `PlanMerger`) to use it. When two scans of the same
table differ only in their projected columns and/or pushed filters, the
optimizer asks the source to fuse them into one scan that covers both, and then
deduplicates the subplans built on top.

The interface is implemented once on the `FileScan` base, so every built-in
file format (Parquet, ORC, CSV, JSON, Text, Avro) participates. Two scans merge
when they read the same data (same file index, schema, options and partition
filters); the merged scan reads the union of the two read schemas. When the
pushed data filters differ, the merged scan widens them to `OR(f1, f2)` and
reads a superset -- the exact per-side predicate is still enforced by the
post-scan `Filter`, which `MergeSubplans` turns into per-aggregate
`FILTER (WHERE ...)` clauses. That widening is off by default, gated by
`spark.sql.files.scanMerge.ignorePushedDataFilters`. Merging is declined when an
aggregate is pushed into the scan, since there is then no post-scan `Filter` to
separate the two sides.

### Why are the changes needed?

V1 file sources already get this. Their filter and column pushdown happens
during physical planning, so when `MergeSubplans` runs the logical plan still
has plain `Filter`/`Project` nodes over identical relation leaves, which the
existing rules merge. DSv2 bakes pushdown into `DataSourceV2ScanRelation` during
logical optimization, so two subqueries that differ only in `WHERE` or `SELECT`
become structurally different leaves and cannot be merged. For example TPC-DS q9
(fifteen scalar subqueries over `store_sales`) collapses to a single scan under
V1 but reads the table fifteen times under V2. This closes that gap.

### Does this PR introduce _any_ user-facing change?

No. The connector interface and the config are internal, and the config
defaults to off, so the default plan is unchanged.

### How was this patch tested?

New tests in `PlanMergeSuite` run each query through both the V1 and V2
file-source paths, with AQE on and off, and assert identical results and
identical merge structure. The merge-semantics cases cover differing columns,
differing data filters, differing partition filters, filter propagation through
a join, and a multi-subquery composition, plus negative cases; the
format-coverage cases run a representative query over Parquet, ORC and JSON.
`MergeSubplansSuite`, `DataSourceV2Suite` and the Parquet/ORC V2
aggregate-pushdown suites still pass. I also checked that V1 and V2 produce
identical results and merge structure on the affected TPC-DS queries (q2, q9,
q28, q59, q77a, q88, q90) at scale factor 1.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.8
@LuciferYang LuciferYang force-pushed the supportsmerge-v2-scans branch from 7c1e41a to 511e5d1 Compare June 2, 2026 12:44
@LuciferYang LuciferYang marked this pull request as ready for review June 3, 2026 05:10
@LuciferYang LuciferYang marked this pull request as draft June 3, 2026 05:11
@LuciferYang LuciferYang marked this pull request as ready for review June 3, 2026 05:15
@LuciferYang

Copy link
Copy Markdown
Contributor Author

cc @peter-toth Do you have any better ideas or suggestions regarding this optimization capability? Thanks ~
also cc @dongjoon-hyun @yaooqinn @cloud-fan

@peter-toth

Copy link
Copy Markdown
Contributor

Thanks for picking this up @LuciferYang. I haven't done a thorough review of the PR yet, but I read it through carefully alongside my old #37711 and wanted to share a thought on the approach before getting into the details.

The shape here is essentially the same as #37711 (which never landed), generalized in two important ways: the merge logic now lives once on FileScan instead of per-format, and you've added OR-widening of pushed data filters under spark.sql.files.scanMerge.ignorePushedDataFilters.

One thought on the design direction, and an alternative worth at least considering before we commit to this shape:

The current shape (call it Option A)

The merge body lives on the connector side, in FileScan.mergeWith. That works cleanly for built-in FileScan formats — they all live in Spark and share one base. I'm not too concerned about file sources for that reason. But:

  • Even within Spark, the in-tree V2 JDBC source (JDBCScan) is not a FileScan. To get parity there, someone would have to re-implement mergeWith against JDBCScanBuilder separately.
  • For external V2 connectors — Iceberg, Delta, Hudi, BigQuery, Cassandra, in-house connectors — every one of them would have to write their own mergeWith. In practice they'll either lag this feature or skip it entirely, and we end up with V2-parity-with-V1 only for built-in file formats.

An alternative (Option B)

SPARK-56385 (April 2026) added pushedFilters: Seq[Expression] to DataSourceV2ScanRelation. The motivation there was constraint propagation, but it incidentally gives us most of the information needed to do this merge entirely on the Spark side:

  1. Small follow-up to SPARK-56385: also record non-filter pushdowns on the relation, either as the existing PushedDownOperators record (already built internally in V2ScanRelationPushDown.ScanBuilderHolder, line 1052) or, simpler, a single hasMergeBlockingPushdown: Boolean set when aggregate / limit / offset / topN / sample / join / variant pushdown happened. This is independently useful for explain output and future optimizer work, not just plan-merge.

  2. In PlanMerger, do the merge body Spark-side using only standard contracts:

    if (np.relation.canonicalized != cp.relation.canonicalized) return None
    if (np.hasMergeBlockingPushdown || cp.hasMergeBlockingPushdown) return None
    val builder = np.relation.table.asInstanceOf[SupportsRead].newScanBuilder(np.relation.options)
    // OR-widen np.pushedFilters with cp.pushedFilters (strict / relaxed split same as this PR)
    // pruneColumns to readSchema().merge(...)
    // build, return as the merged scan
  3. SupportsScanMerging collapses to a zero-method marker interface. A connector opts in by declaring it; Spark provides the merge body using SupportsPushDownFilters / SupportsPushDownRequiredColumns, which the connector already implements as part of normal pushdown.

The strict/relaxed split and the ignorePushedDataFilters config carry over unchanged, so the capability is identical. The format-specific canMergeScanStateWith / hasAggregatePushedDown hooks go away — Parquet's pushedVariantExtractions is itself a tracked pushdown axis (pushDownVariants), so it falls under the merge-blocking flag for free.

Trade-off

  • Option A (this PR): localized; built-in file formats get parity. Each non-FileScan source — in-tree JDBC and every external connector — pays for its own mergeWith later.
  • Option B: depends on a small SPARK-56385 follow-up, but every V2 source (file or otherwise, in-tree or external) gets parity with no per-source code. The connector-side change is one trait declaration.

I don't have a position yet on whether to swap this PR for B or land A first and follow up — wanted to put the alternative on the table since it changes who pays the cost (Spark once vs. every connector). If you'd like to experiment with Option B yourself, please go ahead; if you'd prefer, I can sketch it as a draft so we can compare side by side. Let me know which you'd like.

cc @cloud-fan

@LuciferYang

LuciferYang commented Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

@peter-toth Sorry for missing PR #37711 earlier, and many thanks for your review and suggestions.

You're right that with the body on FileScan, only the built-in file formats get parity, and that in-tree JDBCScan plus every external connector would each have to re-implement mergeWith. Generalizing "who pays the cost" the way Option B does is clearly the better long-term direction, and the hasMergeBlockingPushdown idea is something I'd want regardless of which shape we land on. It collapses the format-specific hooks the PR currently carries -- hasAggregatePushedDown and Parquet's canMergeScanStateWith / pushedVariantExtractions check -- into a single tracked flag, and it's independently useful for explain output and the constraint-propagation TODO already sitting on DataSourceV2ScanRelation.

One thing worth pinning down in the Option B sketch: the holder's pushedFilterExpressions isn't the pre-existing SupportsPushDownFilters.pushedFilters() (the inclusive set, filters pushed whether or not they're also kept post-scan). It's narrower -- only the filters that were fully pushed and removed from the post-scan set. The field on the relation is pushedFilters, mapped from the holder's pushedFilterExpressions via remappedPushedFilters. In V2ScanRelationPushDown.pushDownFilters it's computed as normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains).filter(_.deterministic), and the comment in that same method notes that pushed and post-scan filters can overlap ("e.g. the parquet row group filter"). For best-effort file sources that's exactly what happens: the WHERE predicate is pushed for row-group pruning and kept as a post-scan Filter, so it lands in the post-scan set and is excluded from pushedFilterExpressions. The relaxed OR-widening in this PR widens FileScan's dataFilters, which are precisely those best-effort filters that never land in pushedFilterExpressions.

A merge driven off the relation's pushedFilters therefore can't see the filters it would need to widen, so it can't reproduce the relaxed path that gives the q9-style win -- the OR-of-WHERE-clauses case where widening lets two scans share one pass. There's a sharper version of this: when two file scans differ only in WHERE, both of their pushedFilters are empty, because both predicates went to the post-scan side. A generic strict merge keyed on equal pushedFilters would then fold them into one full-table scan. Results stay correct -- the two WHERE residuals still sit above as per-side Filters -- but each side loses its row-group pruning, and nothing on the relation distinguishes "genuinely no filter" from "best-effort filter pushed elsewhere." That's the distinction A uses to decline this case by default unless ignorePushedDataFilters is set. To get the relaxed behavior back, B would have to thread the post-scan Filter conditions down into the leaf merge and re-translate and re-push them, which is essentially re-doing what FileScan.mergeWith already does.

The strict column-union case (same pushed filters, differ only in projected columns) is universally safe and is the only on-by-default path. That's where B helps most, since it generalizes to every V2 source with no per-connector code. The relaxed OR-widen only makes sense for sources that keep a post-scan residual Filter -- best-effort pushdown, in practice file sources -- which is a property of where residual filters live, not a limitation of the FileScan approach. For an exact-pushdown source like JDBC, two scans differing in WHERE have different pushedFilters, so the gate (ExpressionSet(np.pushedFilters) != ExpressionSet(cp.pushedFilters)) declines them; declining is correct there, because with no Filter above there is nothing to re-apply per side.

There's also a layering issue that affects where the generic body can live. PlanMerger and MergeSubplans are in catalyst, and so is the connector-filter translation (V2ExpressionBuilder / PushableExpression), so the SupportsPushDownV2Filters path (JDBC and friends) would be reachable from catalyst -- but that part alone wouldn't force a move. What pins a generic body to sql/core is the legacy SupportsPushDownFilters translation (DataSourceStrategy.translateFilter) and the PushDownUtils.pushFilters orchestration, which both live there. So the body belongs in sql/core, invoked from the V2 leaf in PlanMerger: "Spark once, not per-connector," but not zero code -- a marker interface still bridges the boundary, keyed on the standard SupportsPushDown* contracts rather than on the connector.

I'd propose we land A first (complete, tested, ships the on-by-default strict column-union case), then generalize in a follow-up that (1) adds a generic Spark-side strict merge in sql/core behind the marker interface, (2) adds the hasMergeBlockingPushdown flag, and (3) keeps the relaxed widening connector-side unless we can make the post-scan residual visible to the leaf merge.

Either way, yes please -- I'd like to take you up on the offer to sketch B. Seeing the two side by side would make the strict-vs-relaxed boundary concrete and show whether our follow-ups converge. If you can wire B so the post-scan residual is visible to the leaf merge, that dissolves my main concern -- I may well be missing a simpler path.

cc @cloud-fan

@peter-toth

Copy link
Copy Markdown
Contributor

Thanks for the thorough writeup, @LuciferYang -- you're right on all the specifics, and the pushedFilters point is a catch I missed.

Agreed that the relation's pushedFilters is the narrow set: only filters that were fully pushed and removed from the post-scan side (normalizedFiltersWithoutSubquery.filterNot(postScanFilterSet.contains)). For best-effort sources, where the WHERE is pushed for row-group pruning and kept as a post-scan Filter, it's empty -- so a merge keyed off it can't see the predicate it would need to widen, and the naive version would fold two different-WHERE scans into one full-table scan and silently drop the pruning. No argument there.

Where I'm more optimistic is the conclusion. The predicate isn't lost, it's just not on the relation: for best-effort sources it's the post-scan Filter condition, and once the (Filter, Filter) symmetric propagation runs it's preserved in the Project aliases MergeSubplans itself builds (propagatedFilter_N = the original predicate). So a Spark-side merge can recover the per-side predicates from the plan it's already standing on -- the Filter(Project(scan)) frame -- OR-widen them, and re-push through the standard SupportsPushDownFilters / SupportsPushDownV2Filters + pruneColumns contracts. That's the "thread the post-scan residual down to the leaf merge" you described, but sourced from the Spark-created Project, so it stays generic -- no per-connector mergeWith.

You're right about layering too: the legacy sources.Filter translation (DataSourceStrategy.translateFilter / PushDownUtils) lives in sql/core, so it can't all sit in catalyst. But that only constrains the legacy-filter path -- i.e. the built-in file sources. The V2 predicate path (V2ExpressionBuilder) is catalyst-native, so for V2-predicate connectors the generic merge can be pure catalyst, with the file-source piece coming in through an injected hook rather than per-source code. So I think it still lands as "Spark once," and hasMergeBlockingPushdown is something I'd want either way.

I don't want to block your PR on a hunch, though. Let me spend a bit of time next week sketching the generic version so we can put the two side by side, as you suggested -- then we can see whether the strict-vs-relaxed boundary really dissolves, and decide together whether the DSv2 file sources are worth their own code on top of the generic core, or whether landing this first and generalizing later is cleaner after all. I'll report back here.

LuciferYang added a commit to LuciferYang/spark that referenced this pull request Jun 17, 2026
…-56385 pushedFilters dependency, adapted to 4.0.x

- backport SPARK-56385 pushedFilters field on DataSourceV2ScanRelation (surgical: field + doCanonicalize + V2ScanRelationPushDown population; drop join-pushdown entanglement)
- update ~12 positional DataSourceV2ScanRelation patterns to 6-arg
- cherry-pick apache#56264: FileScan union (SupportsRuntimeCatalystFilters + SupportsScanMerging), PlanMerger coexists with SPARK-40193 filter-prop
- strip bindingPolicy from new conf; drop ParquetScan variant comparison (no variant pushdown in 4.0.x)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants