Skip to content

[PoC/Proposal] AQE-lite: change plan properties at runtime based on stats from pipeline breakers#23167

Draft
avantgardnerio wants to merge 18 commits into
apache:mainfrom
coralogix:brent/runtime-optimizer
Draft

[PoC/Proposal] AQE-lite: change plan properties at runtime based on stats from pipeline breakers#23167
avantgardnerio wants to merge 18 commits into
apache:mainfrom
coralogix:brent/runtime-optimizer

Conversation

@avantgardnerio

@avantgardnerio avantgardnerio commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Status: PoC/proposal, not for merge. Drafted to frame the design and get feedback from the community before any larger investment.

What the framework produces (concrete)

A query like SELECT bg.group_key, bg.sum_payload, s.payload FROM (SELECT group_key, SUM(payload) FROM big GROUP BY group_key) bg JOIN small s ON bg.group_key = s.id produces this instrumented physical plan:

01)RuntimeOptimizerExec
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, group_key@0)], projection=[group_key@2, sum_payload@3, payload@1]
03)----CoalescePartitionsExec
04)------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0]
05)----ProjectionExec: expr=[group_key@0 as group_key, sum(big.payload)@1 as sum_payload]
06)------PipelineBreakerBuffer
07)--------AggregateExec: mode=FinalPartitioned, gby=[group_key@0 as group_key], aggr=[sum(big.payload)]
08)----------RepartitionExec: partitioning=Hash([group_key@0], 4), input_partitions=4
09)------------PipelineBreakerBuffer
10)--------------AggregateExec: mode=Partial, gby=[group_key@0 as group_key], aggr=[sum(big.payload)]
11)----------------DataSourceExec: partitions=4, partition_sizes=[4, 3, 3, 3]
  • RuntimeOptimizerExec at line 01 — coordinator inserted at the plan root.
  • PipelineBreakerBuffer at lines 06 and 09 — synchronization wrappers above each pipeline breaker (AggregateExec).
  • Static planner has picked the wrong build side: CollectLeft with small (100 rows, line 03-04) as build, aggregated_big (5 actual rows, line 05-11) as probe. The runtime stat from line 07's AggregateExec(FinalPartitioned) proves this inversion. The example RuntimeRule detects it and logs swap intent:
SwapBuildSideIfInverted: would flip HashJoinExec — current build (left) = 100 rows, probe (right) = 5 rows.

Motivation

DataFusion lacks a story for adaptive query execution — runtime-informed re-optimization based on stats only knowable mid-execution (true post-aggregate cardinality, sort extrema, actual join input sizes, partition skew, etc.). Spark added AQE in 3.0 by materializing shuffle outputs to disk between stages and re-entering the optimizer. That model is a poor fit for DataFusion because DataFusion is in-process and streaming; materializing intermediate results between stages would impose Spark-style overhead on a system that doesn't have Spark-style network shuffle costs to amortize.

This PR proposes a streaming-native AQE model: insert lightweight synchronization wrappers above pipeline-breaking operators; use a shared coordinator wrapper at the plan root that runs rules at the natural barrier points; rules observe runtime stats from already-materialized in-memory state and mutate adaptive operators in place.

No materialization to disk. No re-planning pass. No extra buffering beyond what pipeline breakers already do internally. (edit: we probably could re-plan non-executed-yet plan nodes in RuntimeOptimizerExec if we want to go that route - only the interior of the plan tree would change, opaque to observers)

Components

PipelineBreakerBuffer

Sync wrapper inserted above each pipeline-breaking operator (currently AggregateExec, SortExec). Holds one batch per input partition, signals is_ready when all partitions have produced their first poll result (or terminated empty). State machine:

NeedFirstBatch  →  WaitForStreaming  →  EmitHeld  →  Streaming
                       (held)            (release)    (forward)

Three flags:

  • is_ready (mechanical): every input partition has produced its first poll result.
  • streaming_enabled (rule-controllable proposal): reset each poll cycle by RTO — set to is_ready as the permissive default. Rules can flip to false to veto.
  • streaming_started (actual emission control): only RTO flips this via start_streaming(), which also wakes per-partition wakers.

RuntimeOptimizerExec

Inserted at the plan root by a new InsertRuntimeOptimizer physical optimizer rule. On every poll_next, runs a three-phase coordinator cycle:

  1. Propose: walk the subtree; set streaming_enabled = is_ready on each buffer (permissive default).
  2. Evaluate: run each registered RuntimeRule. Rules may flip streaming_enabled = false (veto), mutate adaptive operators in place, or both.
  3. Commit: walk the subtree; for each buffer still enabled, call start_streaming().

A shared Arc<AtomicWaker> is threaded into both RTO and every buffer. Buffers wake it when their is_ready flips; RTO registers cx.waker() on it before each walk. The AtomicWaker is essential because cx.waker() is task-local — a buffer's poll inside a spawned subtask (e.g. one of RepartitionExec's internals) cannot wake RTO via cx.waker(), but can via the shared AtomicWaker.

RuntimeRule trait

pub trait RuntimeRule: Send + Sync + std::fmt::Debug {
    fn name(&self) -> &str;
    fn evaluate(&self, plan: &Arc<dyn ExecutionPlan>);
}

Cheap, idempotent visitors. Rules track their own "already fired" state. They observe runtime stats via new trait methods like runtime_row_count (this PR) and (future) runtime_partition_extrema (analogous to #23090), and mutate adaptive operators via typed methods (HashJoinExec::flip_sides, RepartitionExec::set_split_points, etc. — all future work).

ExecutionPlan::runtime_row_count(partition)

New trait method (default None) returning the number of rows an operator will emit on a given partition. For pipeline-breaking operators, the value is knowable the moment their build phase completes — before any output batch is pulled. Implemented on AggregateExec here, reading from the existing OutputRows metric. ProjectionExec and PipelineBreakerBuffer are passthroughs (the buffer's passthrough is gated: only delegates once is_ready).

Example rule: SwapBuildSideIfInverted

First concrete RuntimeRule. Walks the plan looking for HashJoinExec. For each, compares its left (build) and right (probe) sides' row counts: static statistics() when Exact (e.g. an in-memory dimension table), runtime via runtime_row_count summed across partitions otherwise (e.g. the output of a GROUP BY whose distinct cardinality the planner couldn't predict).

If the build side is larger than the probe side, the rule would call HashJoinExec::flip_sides(). But that method doesn't exist yet, so this PR logs intent only. The detection mechanism is fully wired end-to-end; the actually-saving-work part needs HashJoin to defer its build phase until both sides' pipeline-breakers report is_ready — a follow-up operator change.

SLT walkthrough

runtime_optimizer.slt sets up the canonical misestimation case: big (100K rows) joined to small (100 rows) through a GROUP BY on a low-cardinality column the planner can't propagate distinct stats through. JoinSelection trusts the Inexact ~100K estimate for aggregated_big and picks small as the build side. At runtime aggregated_big is only 5 distinct groups; the rule detects the inversion and would flip.

The EXPLAIN block in the SLT shows the static plan with the AQE primitives wrapped — RuntimeOptimizerExec at root, PipelineBreakerBuffer above each AggregateExec. The build-side swap is not reflected in EXPLAIN because the proposed runtime mutation cannot retroactively appear in static plan output.

What lands here vs. what's deferred

Lands:

  • PipelineBreakerBuffer operator + is_pipeline_breaker heuristic + insertion rule.
  • RuntimeOptimizerExec operator + insertion rule + shared AtomicWaker wake propagation.
  • RuntimeRule trait + three-phase coordinator (propose / evaluate / commit).
  • ExecutionPlan::runtime_row_count trait method.
  • AggregateExec impl of runtime_row_count; ProjectionExec and PipelineBreakerBuffer passthroughs.
  • SwapBuildSideIfInverted rule (log-only).
  • SLT demonstrating the detection end-to-end.

Deferred to follow-ups (each its own PR-sized piece of work):

  1. HashJoinExec::flip_sides() with interior mutability + a build-phase deferral mechanism that waits for descendant PipelineBreakerBuffers to be ready before starting build. This is the change that turns SwapBuildSideIfInverted from a logger into a real optimization.
  2. RepartitionExec::set_split_points() — mutable Partitioning::Range(Option<Vec<SplitPoint>>). Pairs with runtime_partition_extrema on SortExec to enable parallel cumulative-window functions and parallel range repartitioning in general. Mutating RepartitionExec is timing-safe (it has no synchronous build phase, just routing), so this follow-up doesn't need operator deferral.
  3. Additional rules: partition coalescing, skew splitting, adaptive aggregation strategy, broadcast-join switching.
  4. runtime_partition_extrema trait method on SortExec, parallel-window operators (CarryExec, HaloDropExec) — there's already a parallel PoC at Parallel bounded RANGE-frame window functions without PARTITION BY (draft) #23026 / feat: PartitionExtrema + SortExec observer + BoundedWindowAggExec passthrough #23090 / feat(physical-expr): add Partitioning::DynamicRange variant #23094 that overlaps significantly; this work would either build on or replace those primitives.
  5. Restore is_pipeline_breaker() trait method. Pre-Replace execution_mode with emission_type and boundedness #13823 there was a direct API for this; after the split into EmissionType + Boundedness it has to be reconstructed. Neither EmissionType::Final (inherited from descendants, so wraps too aggressively) nor the transition-point filter (Final && all-children-non-Final, misses cascading breakers like FinalPartitioned above Partial) is right. The principled fix is a dedicated method; we use a hardcoded match for now.

Discussion points I'd appreciate feedback on

  • Architectural model. Is the propose/evaluate/commit cycle the right shape, or should rules return decisions to RTO instead of mutating buffer state directly?
  • Pipeline-breaker detection. See deferred point 5 — happy to upstream a dedicated is_pipeline_breaker() trait method as a small precursor PR if there's appetite.
  • Trait surface. runtime_row_count(partition: usize) returns total per partition. Is per-partition vs. cross-partition total the right primitive?
  • Wake propagation via AtomicWaker. Is the shared waker pattern acceptable, or is there a more idiomatic approach DataFusion already uses for cross-task notification I'm missing?
  • Naming. RuntimeOptimizerExec vs AdaptiveExec vs something else? Open to bikeshedding.

What this PoC is not

  • Not a complete AQE implementation. The log-only rule means no actual work is saved yet.
  • Not a replacement for the existing parallel-window-poc work — that work targets the same general space (runtime-informed plan adaptation) but via different primitives (the runtime_partition_extrema API + new operators). The two designs are compatible and could converge.
  • Not optimized. The on-every-poll subtree walks are O(plan depth); fine for the prototype, would want amortization in production.

Looking for: "is this the right architectural direction for AQE in DataFusion?" before investing in any of the follow-up work.

Establishes the failing test for an AQE primitive native to DataFusion's
streaming model. Scenario: `GROUP BY` over a column whose distinct
cardinality the planner can't predict (column stat = Absent), joined
against a small dimension table.

Concretely: `big` has 100K rows with `group_key = id % 5` (actual
distinct count = 5, planner sees Absent). After aggregation:
  actual rows: 5
  planner estimate: Inexact, upper-bounded by input row count = 100K

JoinSelection compares aggregated_big (100K est) vs small (100 Exact)
and picks `small` as the build side. Runtime reality: aggregated_big is
20x smaller than small. Build side is inverted from optimal.

Pencil-in target plan wraps the join in RuntimeOptimizerExec and shows
the swap: `on=[(group_key@0, id@0)]` (target) vs `on=[(id@0,
group_key@0)]` (actual today). Build side switches from small to
aggregated_big.

GROUP BY is the canonical pipeline breaker the AQE primitive needs
anyway — the AggregateExec's build phase IS the natural barrier where
RuntimeOptimizerExec reads runtime stats and re-plans the upper subplan.

Same red-then-green discipline as the parallel-window-cumulative PoC.
Adds the operator (passthrough stub — execute() just forwards) and a
PhysicalOptimizerRule that wraps the plan root in it. The rule runs at
the end of the optimizer chain so the wrapper sits above the final
plan.

EXPLAIN now shows RuntimeOptimizerExec at line 01 of the physical plan.
Behavior unchanged — no buffers, no rules, no synchronization yet. The
SLT stays red for the right reason: the build-side swap that
RuntimeOptimizerExec exists to perform hasn't been wired.

Re-wrap guard in InsertRuntimeOptimizer protects against multi-pass
optimization loops accidentally nesting wrappers.

Subsequent commits add PipelineBreakerBuffer (the two-flag
synchronization primitive), the RuntimeRule trait, and the
SwapBuildSideIfInverted rule that flips HashJoinExec children via a
typed `flip_sides()` method on the operator itself.
Adds the synchronization-point operator (passthrough stub — execute()
just forwards) and extends InsertRuntimeOptimizer to wrap every
pipeline-breaking operator with one.

`is_pipeline_breaker` currently matches AggregateExec and SortExec — the
canonical input-absorbing operators. Other candidates (HashJoinExec's
build side specifically; window aggregates with unbounded frames) can
be added as their adaptive rules need them.

EXPLAIN now shows PipelineBreakerBuffer above each AggregateExec in the
plan. SLT diff is purely the build-side swap remaining — buffer
wrapping is correct in both expected and actual.

Two-flag synchronization (is_ready / go_ahead) and the RuntimeRule
trait come next. After that, SwapBuildSideIfInverted + HashJoinExec's
flip_sides() lands the swap and the test flips green.
PipelineBreakerBuffer now actually buffers: holds one batch per input
partition, sets is_ready when all partitions have produced their first
poll result, returns Pending until set_go_ahead is called by the
coordinator. State machine: NeedFirstBatch -> WaitForGoAhead ->
EmitHeld -> Streaming.

Cross-task wake propagation uses a shared AtomicWaker, not
cx.waker().wake_by_ref. The latter is task-local: when a buffer's
poll runs inside a spawned task (e.g. one of RepartitionExec's
internals), waking cx only wakes the spawned task, never reaching
the top-of-plan task that polls RuntimeOptimizerExec. The shared
AtomicWaker bypasses that boundary — buffers wake it on is_ready
flip, RTO registers cx.waker() on it on each poll_next entry.

RTO::poll_next walks its subtree and calls set_go_ahead on any
buffer whose is_ready is set. Base case: permissive release, no
rules yet. Future commits introduce Vec<RuntimeRule> that can hold
specific buffers or mutate adaptive operators (HashJoinExec::flip_sides)
before releasing.

Register-before-walk in RTO::poll_next closes a race where a buffer
flipping is_ready *after* the walk but *before* we return Pending
would lose its wake — AtomicWaker.wake() with no registered waker
is a no-op.

InsertRuntimeOptimizer rule constructs the shared AtomicWaker and
threads clones into every buffer + the wrapping RTO. futures crate
added to physical-optimizer for AtomicWaker.

SLT runs to completion; only the EXPLAIN diff (build-side swap)
remains red.
Introduces the rule layer of the adaptive-execution framework:

- `ExecutionPlan::runtime_row_count(partition)`: new trait method
  (default `None`) exposing post-barrier output cardinality.
  Implemented on `AggregateExec` via its existing OutputRows metric.

- `RuntimeRule` trait: cheap, idempotent visitors invoked by RTO on
  each poll. Rules observe runtime stats and may mutate adaptive
  operators in place or veto specific buffers.

- Two-phase release on `PipelineBreakerBuffer`:
  - `streaming_enabled` (rule-controllable proposal): RTO resets to
    `is_ready` as the permissive default each cycle; rules can flip
    to `false` to veto.
  - `streaming_started` (actual emission control): only RTO flips
    via `start_streaming`, which also wakes per-partition wakers.

- `RuntimeOptimizerExec::poll_next` runs a three-phase coordinator
  cycle: propose (default permissive), evaluate rules (may veto/
  mutate), commit (start_streaming on still-enabled buffers).

- `SwapBuildSideIfInverted`: first concrete rule. Detects when a
  `HashJoinExec`'s left/build side is larger at runtime than its
  right/probe side and would call `HashJoinExec::flip_sides()` —
  but that method doesn't exist yet, so the rule only logs intent
  (`RUST_LOG=info` visible). flip_sides + HashJoin build-phase
  deferral lands in a follow-up.

The SLT exercises detection end-to-end: small (100 rows static) joined
against GROUP BY of big (5 distinct groups, ~20 partial rows actual,
~100K estimated). The static planner picks small as build; the rule
fires once the FinalPartitioned aggregate emits and runtime row count
is observable.
@github-actions github-actions Bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate auto detected api change Auto detected API change labels Jun 24, 2026
@avantgardnerio avantgardnerio changed the title [PoC/Proposal] Streaming-native adaptive query execution: PipelineBreakerBuffer + RuntimeOptimizerExec + RuntimeRule [PoC/Proposal] AQE-lite: change plan properties at runtime based on stats from pipeline breakers Jun 24, 2026
The walk now relies on `runtime_row_count` propagating correctly via
operator-level passthrough rather than recursing into the subtree.
Three changes:

1. `PipelineBreakerBuffer::runtime_row_count` is a *gated passthrough*:
   returns `None` until `is_ready` flips, then delegates to its input.
   This is the natural stats-availability gate — rules can't observe
   the underlying breaker's row count until the breaker is actually
   done.

2. `ProjectionExec::runtime_row_count` is a plain passthrough — a
   projection doesn't change row count.

3. `AggregateExec::runtime_row_count` no longer drops `Some(0)` to
   `None`. An empty partition (zero distinct groups) should report 0,
   not "not yet started." The previous behavior killed all-must-report
   sums whenever any output partition was empty — exactly the case we
   hit with 5 groups hash-split across 4 partitions.

The walk loses its recursion: `sum_runtime_rows_across_partitions`
requires every output partition to report (`?` on `runtime_row_count`).
With the buffer gate, this is the right semantics: we only act once
the entire breaker is done.

End result: the SwapBuildSideIfInverted rule now fires with the
correct row count — `right = 5` (the true post-aggregate cardinality)
instead of the previous `right = 20` (Partial's per-partition output
inadvertently summed).
@github-actions github-actions Bot removed the auto detected api change Auto detected API change label Jun 24, 2026
Tried `pipeline_behavior() == EmissionType::Final` to replace the
hardcoded match. It's the wrong abstraction here — that flag describes
an operator's output *emission semantics* and is inherited from
descendants. Projection, Repartition, and HashJoin sitting above a
Final-emitting AggregateExec all report Final too, so the rule wraps
every such ancestor.

The transition-point filter (`Final && all children != Final`) gets
closer but still misses cascading breakers: AggregateExec(FinalPartitioned)
has children that are themselves Final (because of the Partial below),
so it fails the "all children non-Final" predicate even though it's
itself a genuine breaker.

Keeping the hardcoded match against AggregateExec / SortExec with a
comment explaining the trade-off. A future `is_pipeline_breaker()`
trait method (the API that existed before apache#13823 split execution_mode
into emission_type + boundedness) would be the principled fix and a
candidate upstream contribution.
@thinkharderdev

Copy link
Copy Markdown
Contributor

They observe runtime stats via new trait methods like runtime_row_count (this PR) and (future) runtime_partition_extrema (analogous to #23090)

Is there a reason we can't do this using the existing ExecutionPlan::partition_statistics method?

@avantgardnerio

avantgardnerio commented Jun 24, 2026

Copy link
Copy Markdown
Contributor Author

using the existing ExecutionPlan::partition_statistics

Yes, that's the case above. The static statistics lie, because they can't know the cardinality reduction. The aggregate does before it emits a batch, thus the flip from the static plan in the dynamic one.

Edit: if you mean use the same method to return runtime statistics - possibly, yes. It has pass-through implications though for children.

@avantgardnerio avantgardnerio requested a review from comphead June 24, 2026 22:23
@comphead

Copy link
Copy Markdown
Contributor

Thanks @avantgardnerio this is challenging task, some insights how Spark AQE does the same

Spark plans a shuffle join initially, lets the input shuffle map stages actually run, reads the byte counts that came out of those stages, and then re-runs the optimizer + planner on the still-unexecuted portion of the plan with those real numbers in hand. Because the size is now real instead of estimated, the planner's normal JoinSelection strategy can flip a SortMergeJoinExec into a BroadcastHashJoinExec — the rule that picks broadcast is the same rule the non-AQE planner uses; what's different is the stats it sees.

Spark is able to identify if relationship is large or small based on the shuffle size, data which available in runtime before the join.

For proposed approach I'm probably missing the part to identify size of relationship? if the first batch is a small batch and subsequent are large ones, what decision would be made?

@thinkharderdev Reg to stats, are they calculated on runtime? if one of the relationship is filtered/generated table, how rowcounts calculated for partition stats?

@avantgardnerio

avantgardnerio commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

if the first batch is a small batch and subsequent are large ones

The idea is that the PipelineBreakerBuffer only comes after pipeline breakers - which consume all input batches before emitting 1 output batch; natural stage barriers within our existing plans. By the time PipelineBreakerBuffer::is_ready() == true, we know the exact stats from the leaf, not a 1 batch approximation.

@avantgardnerio

Copy link
Copy Markdown
Contributor Author

Spark plans a shuffle join initially, lets the input shuffle map stages actually run, reads the byte counts that came out of those stages, and then re-runs the optimizer...

Spark's shuffle is a pipeline breaker; this PR is stating that any pipeline breaker gives us the same epistemic guarantee, just at finer granularity than full-stage shuffles

@comphead

Copy link
Copy Markdown
Contributor

Spark's shuffle is a pipeline breaker; this PR is stating that any pipeline breaker gives us the same epistemic guarantee, just at finer granularity than full-stage shuffles

Shuffle is not always a pipeline breaker, but the concept of pipeline breaker would help in DF if we want to deal with runtime decision, to consume all inputs before producing output. Overall the design makes sense to me IMO. But how would you use it for joins anyway? 🤔

For SMJ the sorting is a pipeline breaker.
What about HJ, NLJ? What would be pipeline breakers for them?

@thinkharderdev

Copy link
Copy Markdown
Contributor

Reg to stats, are they calculated on runtime? if one of the relationship is filtered/generated table, how rowcounts calculated for partition stats?

What I am imagining is that the operators update stats as they process the data. So pre-execution the operator returns partition stats based on static statistics (like it does now) but as data flows through they are updated. For a pipeline breaker if you call partition_statistics once all input is consumed you get back Precision::Exact stats because the precise values are known.

I'm concerned that the approach outlined in this PoC is moving towards an entirely separate and parallel optimization framework which seems confusing. There shouldn't really be a difference between optimizing a plan pre-execution vs in-flight as ultimately you are just (in principle) doing the same optimization passes but with better statistics.

@Dandandan

Copy link
Copy Markdown
Contributor

I agree with the points already made above:

  • It should be clear when a non-shuffle/non-push-based execution will benefit from this: for joins, we usually don't want to fully know both sides of the join as they need to be spilled / shuffled fully when the underlying data source / query to get exact statistics. AQE in streaming mode will be different than when data is streamed / pushed.

  • We probably want to reuse the existing optimizations as much as possible. E.g. JoinSelection etc.

Also I am a bit confused to see RuntimeOptimizerExec, I think that feels a bit hacky to reuse the execution plan for some AQE plumbing 🤔 (but perhaps I need to be convinced)

@avantgardnerio

Copy link
Copy Markdown
Contributor Author

Thanks for the feedback guys! I really appreciate all of it.

I think I can answer most of it in a new push, coming shortly. Once that's up I'll address each point.

avantgardnerio and others added 4 commits June 25, 2026 10:17
The default impl at execution_plan.rs:528 returns Statistics::new_unknown.
Without this override, wrapping a subtree in a buffer blackholes its static
stats — breaking the static fallback in side_runtime_rows when runtime
stats aren't yet available (e.g. the left side of a HashJoin with a small
dim table that has Exact stats statically).

Prerequisite for shifting the insertion rule onto HashJoin children.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The buffer is becoming a stage boundary: with materialization (next
commit) it owns its input's drain and IS the breaker, not a passthrough
above one. New name reflects that. Display string carries the stage
number ("StageBoundaryBuffer: stage=N") so EXPLAIN and logs can talk
about stages explicitly.

All buffers currently get stage=0; bottom-up stage numbering comes with
the insertion-rule shift to HashJoin children.

Pure rename + plumbing. No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces first-batch holding with per-partition full materialization.
The boundary IS the breaker now, not a synchronizer above one.

Mechanics:
- prime(ctx) (called by RTO) spawns one SpawnedTask per input partition,
  each pulling input.execute(p, ctx) to EOF and ferrying batches through
  an unbounded mpsc. SpawnedTask auto-aborts on Drop → query cancel
  cleans up cleanly.
- execute(p, _) hands back the per-partition receiver wrapped in a
  ConsumerStream that gates on streaming_started, then forwards.
  Never touches input.execute itself.
- is_ready flips when every drain task has reached EOF.

Per-partition tx and rx are separately taken (independent Options) so
prime and execute can arrive in either order without one starving the
other (caught a deadlock first when execute took the whole ChannelEnds
before prime ran).

RTO's execute() now walks the subtree and primes every StageBoundaryBuffer
it finds. Without this, consumers above buffers would sit Pending forever
(HashJoin in CollectLeft never polls probe until build completes — but
build itself is now gated behind a buffer). prime is idempotent across
the multiple execute() calls a multi-partition RTO may receive.

Memory: every primed buffer holds its full input until release. Spill is
a follow-up; OomGuard catches genuine OOM in the meantime. TODO comment
above the channel type aliases makes the spill plan visible to readers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Before: a single optimizer rule did two jobs in one pass — wrap each
pipeline breaker in a buffer, then wrap the root in a RuntimeOptimizerExec.
A shared Arc<AtomicWaker> was constructed in that call and threaded
through both.

After: two independent PhysicalOptimizerRules.
  1. InsertStageBoundariesAtBreakers — wraps Agg/Sort in
     StageBoundaryBuffer. Same Agg/Sort target as before; the temporary
     name reflects current behavior. The next commit retargets it to
     HashJoin inputs (rename + bottom-up stage numbering).
  2. InsertRuntimeOptimizer — wraps the root in RuntimeOptimizerExec.
     Trivial now.

This is the infrastructure pattern future adaptive rules (partition
coalescing, skew handling) plug into: each adds its own targeted
boundary-insertion rule; the RTO wrapper is shared.

Decoupling required moving the AtomicWaker from "shared, constructed at
optimize time" to "each buffer owns one, RTO registers per poll":
- StageBoundaryBuffer::new no longer takes an rto_waker; constructs its
  own internally.
- StageBoundaryBuffer::register_consumer_waker(&Waker) — RTO threads the
  consumer-task waker through this each poll.
- RuntimeOptimizerExec drops its rto_waker field; CoordinatorStream walks
  the subtree per poll to register on every buffer's waker.

No behavior change. SLT and clippy clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
avantgardnerio and others added 7 commits June 25, 2026 11:30
Rename InsertStageBoundariesAtBreakers → InsertHashJoinBoundaries and
swap the transform: instead of wrapping every Agg/Sort, wrap each input
of every HashJoinExec. Both join sides are now gated so neither has
been consumed by the join when runtime stats arrive — the precondition
for the swap rule actually flipping (rather than just logging).

Stage numbers are assigned bottom-up: a new boundary's stage is one more
than the highest stage of any boundary already in its input subtree, or
0 if there is none. transform_up visits deeper joins first, so when an
outer join's children are processed the inner-join boundaries already
exist and their stages are visible. In the SLT scenario both boundaries
are stage=0 because there is no nested HashJoin.

SLT EXPLAIN updated: buffers now sit directly above HashJoin's children
(CoalescePartitionsExec on the left, ProjectionExec on the right);
Aggregate operators are no longer wrapped. SLT result still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the propose/veto/commit triphase per-poll walk with a single
event: find the lowest stage whose boundaries are all ready but none
released yet, fire each RuntimeRule once for that stage, then release.
Stage K+1's boundaries can't fill until stage K is released, so stages
naturally serialize without an explicit veto mechanism — the
streaming_enabled field (and its rule-controllable proposal flag) is
gone.

Rule trait gains a completed_stage parameter. SwapBuildSideIfInverted
checks both HashJoin children are StageBoundaryBuffers at the
just-completed stage before acting; nested joins higher up wait for
their own stage. Solves the previous-design problem where the LEFT
buffer (small dim, static stats) could release independently before
the RIGHT was ready, leaving the swap rule no time to act.

Observability: RTO logs "stage K ready" before firing rules and
"stage K released" after. With the existing SwapBuildSideIfInverted
log line, RUST_LOG=info shows the ordering:
  RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release
  SwapBuildSideIfInverted: would flip HashJoinExec — current build
    (left) = 100 rows, probe (right) = 5 rows. ...
  RTO: stage 0 released; downstream consumers can now drain ...

That ordering proves (a) the rule fires and (b) it fires before any
data flows into the join (release is what unblocks the ConsumerStreams
HashJoin reads from).

Spill TODO expanded to "spill-or-stream": follow-up will give boundaries
two escape hatches under MemoryPool pressure — spill to disk OR
stream-through (set has_overflowed, skip the swap decision, behave
exactly as the pre-AQE baseline). The narrative becomes: "buffers fit →
adaptive optimization; buffers don't fit → query still runs exactly as
it does today."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The trait now has the same shape as
`datafusion_physical_optimizer::PhysicalOptimizerRule::optimize`:

  fn optimize(
      &self,
      plan: Arc<dyn ExecutionPlan>,
      config: &ConfigOptions,
  ) -> Result<Arc<dyn ExecutionPlan>>;

The trait still lives in `physical-plan` because `physical-plan` cannot
depend on `physical-optimizer` (the dependency runs the other way). The
dual shape is the migration story: any static `PhysicalOptimizerRule`
can be made runtime-aware by reading state from `StageBoundaryBuffer`s
in the plan tree, and a future upstream unification of the two traits
requires no change to call sites.

Rules identify the just-completed stage by inspecting buffer state
(`is_ready && !streaming_started`) rather than receiving a stage
number — that lets the same trait serve both static and runtime
optimization without an extra parameter.

SwapBuildSideIfInverted rewritten as a transform_up walk: drops the
AtomicBool `fired` field (buffer-state check is now the gate; once a
stage's buffers are released, `streaming_started` is true and the rule
naturally skips them) and matches the new signature. Still log-only;
the actual `HashJoinExec::swap_inputs` call lands in the next commit.

RTO threads `Arc<TaskContext>` into CoordinatorStream so rules see the
session config (target_partitions, etc.) the same way static
`PhysicalOptimizerRule`s do; the returned plan replaces RTO's plan
in place.

SLT green, log ordering unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Phase 2 of the AQE-lite primitive: the runtime swap actually happens.

SwapBuildSideIfInverted.optimize now:
1. Walks the plan via transform_up.
2. At each HashJoinExec whose children are StageBoundaryBuffers in the
   just-completed state, compares left/right runtime row counts.
3. When l > r, calls join.swap_inputs(*join.partition_mode()) and
   returns Transformed::yes with the new join.
4. Runs the swapped plan through ensure_collect_left_single_partition,
   which wraps the new left in a CoalescePartitionsExec if it reports
   more than one output partition. (Required: CollectLeft mode asserts
   the left child has exactly one partition, but the post-swap left
   was the original right — often multi-partition behind a
   RepartitionExec / Agg(FinalPartitioned) chain.)

For the runtime swap to produce a working join, RTO now DEFERS
self.input.execute. Previously it executed self.input upfront, which
recursively called buffer.execute on each StageBoundaryBuffer's
consumer side — taking the rxs into the OLD HashJoin's stream chain.
After replan, the new join's execute would try to take the rxs again
and fail.

The new flow:
  - RTO.execute primes drains (calls buffer.input.execute via
    SpawnedTask, NOT buffer.execute) and returns a CoordinatorStream
    with child=None.
  - CoordinatorStream.poll_next handles stage completion, fires rules,
    releases boundaries. While any boundary remains gated, defers
    execution by returning Pending.
  - Once all boundaries have started streaming, lazily calls
    self.plan.execute(partition, ctx) to create the child stream.
    All rxs are still intact at this point, so the (possibly
    replanned) join takes them cleanly.

Single-stage queries (the SLT scenario) work end-to-end. Multi-stage
replan — where an outer-stage drain task transitively executes an
inner-stage HashJoin's children before the inner stage completes — is
deferred; see task notes.

SLT result remains correct (plan-agnostic correctness); EXPLAIN still
shows the static plan. The runtime swap is observable via RUST_LOG=info:

  RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release
  SwapBuildSideIfInverted: flipping HashJoinExec — current build
    (left) = 100 rows, probe (right) = 5 rows. Calling swap_inputs ...
  RTO: stage 0 released; downstream consumers can now drain the
    buffered data

That ordering proves the rule fires before any data flows into the
join — release is what unblocks the ConsumerStreams the join reads
from.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously RTO eagerly primed every StageBoundaryBuffer in the subtree
at execute() time. For single-stage queries that's fine, but for nested
HashJoins it breaks replan: stage-1's drain task immediately executes
the InnerJoin (taking stage-0's rxs), so by the time stage 0 completes
and the rule wants to swap the InnerJoin, the OLD InnerJoin is already
running. The post-replan plan in self.plan has a new InnerJoin, but
the actual execution uses the OLD layout — replan happens in the tree
but not in the live execution.

Lazy priming fixes this:
- RTO.execute primes only stage 0.
- After releasing stage K's boundaries, CoordinatorStream primes stage
  K+1's boundaries in the (possibly replanned) plan. Boundaries
  rebuilt by the rule's transform_up are freshly constructed; they
  get their drain tasks here, against the post-replan subtree.

This also tightens the contract on `CoordinatorStream.child`'s doc
comment — "all rxs are still available" now genuinely holds for
multi-stage plans, not just single-stage ones.

`buffer.prime()` is idempotent, so the helper is safe to call
repeatedly. `prime_all_buffers` is gone; `prime_buffers_at_stage`
takes a stage filter and is used in both call sites.

SLT (single-stage) unchanged; swap still fires once, logs and result
identical.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
runtime_optimizer.rs had grown to mix the RTO coordinator, the
RuntimeRule trait, and concrete rules in one file. Split:

  runtime_optimizer.rs         RuntimeRule trait + RuntimeOptimizerExec
                               + CoordinatorStream + helpers
  runtime_rules/mod.rs         module index + re-exports
  runtime_rules/swap_build_side_if_inverted.rs
                               SwapBuildSideIfInverted impl + its
                               helpers (just_completed_stage_of_join,
                               ensure_collect_left_single_partition,
                               side_runtime_rows,
                               sum_runtime_rows_across_partitions)

Pure refactor — 158 lines moved without behavior change. Future rules
land alongside, one file each. The single import-site update at
physical-optimizer/runtime_optimizer.rs splits the existing combined
import into two lines (the trait still lives at
datafusion_physical_plan::runtime_optimizer; the concrete rule moves
to datafusion_physical_plan::runtime_rules).

SLT and clippy unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The buffer materializes its input — so it knows the true post-input
cardinality. Surface that directly:

- New `row_counts: Arc<Vec<AtomicUsize>>` field, one counter per
  partition. Drain task increments by `batch.num_rows()` as it ferries
  each batch through the channel.
- `runtime_row_count(p)` returns the partition's counter once
  `is_ready` flips. No longer delegates to `self.input.runtime_row_count`,
  which often returned None (CoalescePartitionsExec, etc. don't
  implement it) and forced the rule to fall back to static stats.

With true per-partition runtime counts always available at the
boundary, `SwapBuildSideIfInverted::side_runtime_rows` drops the
plan-time-statistics fallback and the StatisticsArgs import — runtime
is the only source of truth now.

`statistics_with_args` passthrough on the buffer stays (other static
rules in the optimizer pipeline still ask for stats through plan
trees), but its docstring is updated: the rule we wrote no longer
needs it.

SLT unchanged (still l=100, r=5), but the numbers now come from the
buffer's own counters. Cleaner story for the PR: "runtime stats are
authoritative; the buffer counts what flows through it."

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@avantgardnerio

avantgardnerio commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Thanks again for the feedback! I think it has helped me shape this into a much more AQE-looking PR:

  1. The PR is now functional. It actually does flip the join sides. The SLT passes just like it did without AQE. Based on runtime_row_count()
  2. PipelineBreakerBuffer is now StageBoundryBuffer, and it actually buffers. This allows us to insert it even on the left (streaming side) of the example SLT HashJoin.
  3. Each StageBoundry gets assigned a number, within a stage all leaves execute in parallel. Stages execute sequentially.
  4. RuntimeOptimizer calls prime() on each StageBoundryBuffer to kick it off in it's own tokio task, much like how RepartitionExec polls it's children today - this skips the normal DataFusion poll-based flow to execute subtrees incrementally and bottom up
  5. RuntimeRule now has the exact same signature as PhysicalOptimizerRule, hopefully showing how they can be the same (just requires refactoring into a common crate). Rules would have to be made "runtime aware" one at a time, but this gives us a migration path to AQE
  6. StageBoundryBuffer is it's own pipeline breaker - it stores everything in an unbounded channel in this PR. I think in the future we could prime() it until it hits a memory threshold, then give up and release it to start streaming - if this happens we'd no longer be able to get run time stats and modify the plan, but at least it would be "no worse" than today.
  RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release
  SwapBuildSideIfInverted: flipping HashJoinExec — current build (left) = 100 rows, probe (right) = 5 rows.
  RTO: stage 0 released; downstream consumers can now drain ...
01)RuntimeOptimizerExec
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, group_key@0)], projection=[group_key@2, sum_payload@3, payload@1]
03)----StageBoundaryBuffer: stage=0
04)------CoalescePartitionsExec
05)--------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0]
06)----StageBoundaryBuffer: stage=0
07)------ProjectionExec: expr=[group_key@0 as group_key, sum(big.payload)@1 as sum_payload]
08)--------AggregateExec: mode=FinalPartitioned, gby=[group_key@0 as group_key], aggr=[sum(big.payload)]
09)----------RepartitionExec: partitioning=Hash([group_key@0], 4), input_partitions=4
10)------------AggregateExec: mode=Partial, gby=[group_key@0 as group_key], aggr=[sum(big.payload)]
11)--------------DataSourceExec: partitions=4, partition_sizes=[4, 3, 3, 3]

Hopefully this makes clear that the PR is a migration path, a generalizable solution. So what's missing? If we keep going this route would we be able to 1. land it today, 2. eventually get to "full" AQE?

@avantgardnerio

avantgardnerio commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

And as promised, answers to individual questions:

Why not extend the existing partition_statistics method instead of adding runtime_row_count?

I think we should do that. What I have was convenient for fall through to children.

Feels like a parallel optimization framework. There shouldn't be a difference between pre-execution and in-flight optimization.

That's exactly the target, and it's why RuntimeRule has the same signature as PhysicalOptimizerRule now. The two traits exist today only because physical-plan can't depend on physical-optimizer (the dependency runs the other way) . End-state: one trait, with rules optionally reading runtime stats from boundaries in the plan tree they receive. Static rules ignore runtime state; runtime-aware rules use it. Per-rule migration: any existing PhysicalOptimizerRule (including JoinSelection) can be made runtime-aware by adding state inspection. Existing rules don't change until they want to.

Pipeline breakers for HJ and NLJ?

For HJ this PR inserts one: InsertHashJoinBoundaries wraps each HashJoin input in a StageBoundaryBuffer. The buffer materializes its input and IS the pipeline breaker — we don't need a natural one inside the join.

  • Same pattern works for NLJ (no natural breaker, just insert one above each input — ~30-line rule).
  • For SMJ either approach works: reuse the existing SortExec (it's already a breaker — insert the boundary above it for stats) or insert directly above the SMJ inputs. Either way the infrastructure is the same.
  • The contribution is precisely this generalization: any operator can become adaptive without depending on a natural breaker existing inside it.

Joins usually don't want to fully know both sides; when does non-shuffle execution benefit?

  • The win case is exactly what the SLT shows: one side is derived (aggregation, filter, scan-with-projection) whose post-operator size is much smaller than the planner's Inexact upper bound. Materializing that side is cheap relative to the bad build-side choice it lets us avoid.
  • For sides where materialization is genuinely unsafe (large streaming inputs), the stream-through fallback (point 5 of the summary) gates: if memory pressure hits the threshold, the boundary releases without finishing materialization, the runtime rule sees an unflagged stage and bails, the query runs exactly as it would today.
  • Net: adaptive wins when stats are knowable cheaply; otherwise we're no worse than the pre-AQE baseline.

Reuse existing optimizations like JoinSelection.

  • Strongly agree. SwapBuildSideIfInverted already calls HashJoinExec::swap_inputs — the same primitive JoinSelection uses. Same logic, fresher numbers.
  • With the trait unification above, JoinSelection itself could run at runtime against the post-breaker plan, no re-implementation needed.

RuntimeOptimizerExec feels hacky — reusing an ExecutionPlan for AQE plumbing.

  • Fair concern. The reason it IS an ExecutionPlan: that's the only point in the existing pipeline where we can intercept poll-time decisions without changes to every operator or to the session layer. The RTO sits at the root, opt-in via an optimizer rule.
  • The wrapper-at-root pattern matches other "coordinator" nodes in DF (e.g. how CoalescePartitionsExec orchestrates downstream). The operator interface is the natural composition surface.
  • Open to alternatives — a SessionContext-level coordinator would also work but requires plumbing into every execute path. The wrapper was the surgical "get it done today, show working code" compromise of this PR.

Summary: RuntimeRule collapses into PhysicalOptimizerRule, runtime_row_count collapses into partition_statistics, RuntimeOptimizerExec becomes the only new operator, and StageBoundaryBuffer is the only new infrastructure — everything else is existing rules reading more accurate stats.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants