Skip to content

Add AQE to DataFusion #23194

Description

@avantgardnerio

Is your feature request related to a problem or challenge?

DataFusion makes all execution-plan choices at planning time from static statistics, which are frequently Inexact or Absent (e.g. after a GROUP BY, a selective filter, or anything involving derived columns). A wide set of optimizations is blocked by this — the planner has to guess at runtime cardinality and a bad guess costs orders of magnitude.

Concrete decisions that need runtime stats:

  • Build-side swap for hash joins — JoinSelection picks build/probe from estimates; when wrong, the build side is too big. Needs runtime row counts.
  • Dynamic range repartitioning — parallel sort and parallel window need range boundaries chosen from the actual data distribution, not estimates. Needs runtime partition extrema / quantiles.
  • Parallel window functions — partitioning a window operator across threads only pays when partitions are roughly balanced; runtime property. Needs row counts and (for range windows) extrema.
  • Parallel sort — pick the right degree of parallelism and the right range boundaries once we know how much data there is and how it's distributed.
  • Post-shuffle partition coalescing — collapse many tiny partitions (over-estimated repartition) into a smaller number of right-sized ones. Needs per-partition row counts.
  • Skew handling — detect a hot partition and split it across workers. Needs per-partition row counts and (for split keys) per-key distribution.
  • Adaptive aggregation strategy — pick between hash and sort aggregation, or between Partial and Single, based on observed cardinality.
  • Dynamic filter pushdown from build to probe — generate a filter from the materialized build side and push it into the probe-side scan.

Each of these needs the same primitive: read whatever runtime stat is relevant to the decision from a completed sub-plan, then re-run a planning decision against the up-to-date plan.

Describe the solution you'd like

PoC in #23167.

The shape of the solution:

  • StageBoundaryBuffer — an ExecutionPlan operator that materializes its input. It is a pipeline breaker by construction, so it's a safe synchronization point where runtime stats become observable. No need to find a natural breaker inside the operator being adapted.
  • A family of runtime_* stat methods on ExecutionPlan, each populated by the buffer's drain task as data flows through. The PoC ships runtime_row_count (used for the build-side swap). The same pattern extends to runtime_partition_extrema (in-progress for parallel sort / range repartitioning), per-key distribution sketches (for skew handling), bloom-filter snapshots (for dynamic filter pushdown), and so on. Long-term these consolidate into partition_statistics returning Precision::Exact once the breaker has completed.
  • Per-decision insertion ruleInsertHashJoinBoundaries (the first, in the PoC) wraps each HashJoin input. Future adaptive optimizations each get their own targeted insertion rule (InsertWindowBoundaries, InsertRangePartitioningBoundaries, etc.). The rules don't need to know about each other.
  • RuntimeOptimizerExec — single always-on wrapper at the plan root. On each stage-completion event, fires registered runtime rules against the current plan, replaces the plan with each rule's output, then releases the stage's boundaries so downstream consumers can drain. Critically, the runtime rule can rewrite any part of the plan above the boundary — different partition counts, different repartition strategies (hash → range), different join layouts — so dynamic partitioning falls out of the same machinery; the buffer just stabilizes the input.
  • RuntimeRule trait with identical signature to PhysicalOptimizerRule::optimize. They live in separate traits today only because physical-plan cannot depend on physical-optimizer (the dependency runs the other way). End-state: a single trait. Existing rules become runtime-aware by reading boundary state from the plan tree they receive — JoinSelection itself can run against a post-breaker plan with no logic change, only fresher numbers.
  • Stages execute sequentially; boundaries within a stage execute in parallel via spawned drain tasks (matching RepartitionExec's spawn model).
  • Memory bounded by the materialized side, with a spill-or-stream fallback — under MemoryPool pressure, spill to disk if available; otherwise release the boundary early and let the rule see "overflowed" — the query still runs, we just lose the adaptive decision (no worse than the pre-AQE baseline).

The PoC demonstrates the build-side-swap case end-to-end. Every other item in the problem list is some combination of (a) one ~30-line targeted insertion rule, (b) one runtime rule that reads the relevant stat method, and (c) whichever runtime_* stat method that decision needs.

Describe alternatives you've considered

  • Status quo (static stats only). Decisions are made at plan time from Inexact estimates. Bad estimates produce slow queries. No path to win on the optimizations listed above.
  • Spark-style full re-planning at shuffle boundaries. Run the full optimizer on the unexecuted portion of the plan after each shuffle finishes. Works in Spark because shuffle is a heavyweight stage barrier. In a streaming/in-process engine like DataFusion, full re-planning at every potential decision point is too coarse. Inserting a targeted pipeline breaker exactly where stats are needed is finer-grained and matches the streaming model.
  • Stat updates without explicit boundaries. Have every operator update its partition_statistics as data flows through; rules poll and re-decide. Possible but requires contract changes across every operator and a polling-based fire mechanism. The boundary approach gives the same epistemic guarantee with a single new operator and event-driven firing.

Additional context

PoC PR: #23167. The runtime swap on a real query is observable via RUST_LOG=info:

RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release
SwapBuildSideIfInverted: flipping HashJoinExec — current build (left) = 100 rows, probe (right) = 5 rows.
RTO: stage 0 released; downstream consumers can now drain ...

Spark AQE for context: https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions