diff --git a/Cargo.lock b/Cargo.lock index 1b84a6b9792cd..bda0e6023d46c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2432,6 +2432,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-pruning", + "futures", "insta", "itertools 0.14.0", "recursive", diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 38c8a7c37211f..4104acac71ce3 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -50,6 +50,7 @@ datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-pruning = { workspace = true } +futures = { workspace = true } itertools = { workspace = true } recursive = { workspace = true, optional = true } diff --git a/datafusion/physical-optimizer/src/insert_hash_join_boundaries.rs b/datafusion/physical-optimizer/src/insert_hash_join_boundaries.rs new file mode 100644 index 0000000000000..695a08ba4d238 --- /dev/null +++ b/datafusion/physical-optimizer/src/insert_hash_join_boundaries.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`InsertHashJoinBoundaries`] wraps each input of every `HashJoinExec` +//! in a [`StageBoundaryBuffer`]. Both sides are gated so that neither +//! has been consumed by the join by the time runtime stats arrive — the +//! precondition for the build-side-swap rule actually swapping (rather +//! than just logging intent). Memory cost is the size of both inputs; +//! spill is a follow-up, OomGuard catches genuine OOM in the meantime. +//! +//! Stage numbers are assigned bottom-up: a new boundary's stage is one +//! more than the highest stage of any boundary already in its input +//! subtree, or 0 if there is none. Lowest stage releases first; +//! [`RuntimeOptimizerExec`] walks the subtree at runtime and primes the +//! next stage as the previous one's boundaries become ready. +//! +//! `transform_up` ensures deeper joins are visited before outer joins, +//! so when an outer-join boundary is computed the inner-join boundaries +//! already exist and their stage numbers are visible. +//! +//! [`RuntimeOptimizerExec`]: datafusion_physical_plan::runtime_optimizer::RuntimeOptimizerExec + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::joins::HashJoinExec; +use datafusion_physical_plan::stage_boundary_buffer::StageBoundaryBuffer; + +#[derive(Default, Debug)] +pub struct InsertHashJoinBoundaries; + +impl InsertHashJoinBoundaries { + pub fn new() -> Self { + Self + } +} + +impl PhysicalOptimizerRule for InsertHashJoinBoundaries { + fn name(&self) -> &str { + "InsertHashJoinBoundaries" + } + + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + let with_buffers = plan + .transform_up(|node| { + if node.downcast_ref::().is_none() { + return Ok(Transformed::no(node)); + } + let mut new_children: Vec> = + Vec::with_capacity(node.children().len()); + for child in node.children() { + if child.downcast_ref::().is_some() { + new_children.push(Arc::clone(child)); + continue; + } + let stage = + max_buffer_stage_in_subtree(child).map_or(0, |max| max + 1); + new_children.push(Arc::new(StageBoundaryBuffer::new( + Arc::clone(child), + stage, + ))); + } + Ok(Transformed::yes(node.with_new_children(new_children)?)) + })? + .data; + Ok(with_buffers) + } + + fn schema_check(&self) -> bool { + true + } +} + +fn max_buffer_stage_in_subtree(plan: &Arc) -> Option { + let mut max_stage = plan.downcast_ref::().map(|b| b.stage()); + for child in plan.children() { + if let Some(child_max) = max_buffer_stage_in_subtree(child) { + max_stage = Some(max_stage.map_or(child_max, |m| m.max(child_max))); + } + } + max_stage +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index b9eb248f6e843..20b170722e811 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -43,7 +43,9 @@ pub mod output_requirements; pub mod projection_pushdown; pub use datafusion_pruning as pruning; pub mod hash_join_buffering; +pub mod insert_hash_join_boundaries; pub mod pushdown_sort; +pub mod runtime_optimizer; pub mod sanity_checker; pub mod topk_aggregation; pub mod topk_repartition; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 0f81512b61c8e..036527172498e 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -36,8 +36,10 @@ use crate::topk_repartition::TopKRepartition; use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::hash_join_buffering::HashJoinBuffering; +use crate::insert_hash_join_boundaries::InsertHashJoinBoundaries; use crate::limit_pushdown_past_window::LimitPushPastWindows; use crate::pushdown_sort::PushdownSort; +use crate::runtime_optimizer::InsertRuntimeOptimizer; use crate::window_topn::WindowTopN; use datafusion_common::Result; use datafusion_common::config::ConfigOptions; @@ -247,6 +249,19 @@ impl PhysicalOptimizer { // given query plan; i.e. it only acts as a final // gatekeeping rule. Arc::new(SanityCheckPlan::new()), + // Adaptive-execution infrastructure. Two rules, in order: + // 1. InsertHashJoinBoundaries — wraps each HashJoin input + // in a StageBoundaryBuffer with a bottom-up stage + // number; the boundaries are where runtime stats become + // observable and where the build/probe sides are gated + // until a swap decision can be made. + // 2. InsertRuntimeOptimizer — wraps the plan root in a + // RuntimeOptimizerExec that walks the subtree at runtime + // to release ready buffers and run RuntimeRules. + // Split so future adaptive rules can add their own targeted + // boundary-insertion rules without touching the RTO wrapper. + Arc::new(InsertHashJoinBoundaries::new()), + Arc::new(InsertRuntimeOptimizer::new()), ]; Self::with_rules(rules) diff --git a/datafusion/physical-optimizer/src/runtime_optimizer.rs b/datafusion/physical-optimizer/src/runtime_optimizer.rs new file mode 100644 index 0000000000000..3421125b14c09 --- /dev/null +++ b/datafusion/physical-optimizer/src/runtime_optimizer.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`InsertRuntimeOptimizer`] wraps the (now-final) plan root in a +//! [`RuntimeOptimizerExec`] with the default set of runtime rules. It +//! does nothing else — buffer insertion happens in a separate, targeted +//! rule (today: [`InsertStageBoundariesAtBreakers`]). The split lets +//! future adaptive optimizations (partition coalescing, skew handling) +//! introduce their own targeted insertion rules without touching the +//! RTO-wrapping logic. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::runtime_optimizer::{RuntimeOptimizerExec, RuntimeRule}; +use datafusion_physical_plan::runtime_rules::SwapBuildSideIfInverted; + +#[derive(Default, Debug)] +pub struct InsertRuntimeOptimizer; + +impl InsertRuntimeOptimizer { + pub fn new() -> Self { + Self + } +} + +impl PhysicalOptimizerRule for InsertRuntimeOptimizer { + fn name(&self) -> &str { + "InsertRuntimeOptimizer" + } + + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + // Don't re-wrap if we've already been inserted (multi-pass loops). + if plan.downcast_ref::().is_some() { + return Ok(plan); + } + let rules: Vec> = + vec![Arc::new(SwapBuildSideIfInverted::new())]; + Ok(Arc::new(RuntimeOptimizerExec::new(plan, rules))) + } + + fn schema_check(&self) -> bool { + true + } +} diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 08468bffc0dd9..e9437724adb0a 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -32,7 +32,7 @@ use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, }; -use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::metrics::{ExecutionPlanMetricsSet, MetricValue, MetricsSet}; use crate::statistics::StatisticsArgs; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, @@ -1765,6 +1765,34 @@ impl ExecutionPlan for AggregateExec { Some(self.metrics.clone_inner()) } + /// Reads the `OutputRows` metric for the given partition. Returns + /// `None` until that partition has emitted at least one batch. + /// + /// For `Final` / `FinalPartitioned` modes the build phase produces + /// all groups in a single batch per output partition, so once any + /// output has been pulled the count equals the true post-build + /// cardinality for that partition. That's what downstream adaptive + /// rules consume. + /// + /// For `Partial` modes the count is also per-partition truth — + /// each input partition independently knows how many partial-group + /// rows it has produced. + fn runtime_row_count(&self, partition: usize) -> Option { + // Find the OutputRows metric registered for this partition. + // Returning `Some(0)` for an empty partition is correct — None + // means "this partition hasn't been started yet" (the metric + // doesn't exist). + self.metrics + .clone_inner() + .iter() + .find_map(|m| match (m.partition(), m.value()) { + (Some(p), MetricValue::OutputRows(count)) if p == partition => { + Some(count.value()) + } + _ => None, + }) + } + fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { let child_statistics = args.compute_child_statistics(&self.input, args.partition())?; diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 76abf73e0ebbe..4877bcfd689f0 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -496,6 +496,29 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { None } + /// Returns the number of rows this operator **will emit** on the given + /// output partition, if that count is knowable at the moment of the + /// call. + /// + /// Pipeline-breaking operators that absorb all input before emitting + /// (`AggregateExec`, `SortExec` once sorted, the build side of + /// `HashJoinExec` once built) know their true per-partition output + /// cardinality the moment their build phase completes — *before* any + /// output batch has been pulled. That is precisely the runtime stat + /// downstream adaptive-execution rules (build-side swap, partition + /// coalescing, skew handling) need. + /// + /// Callers that want a cross-partition total sum across all + /// `partition` indices. + /// + /// Streaming operators never have a meaningful answer and should + /// leave this as the default `None`. Pipeline-breaking operators + /// should return `None` *before* their barrier completes and + /// `Some(rows)` once it has. + fn runtime_row_count(&self, _partition: usize) -> Option { + None + } + /// Returns statistics for a specific partition of this `ExecutionPlan` node. /// /// Deprecated: use [`Self::statistics_with_args`] instead, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6cc6e44c32cc3..0b7f0a40605b5 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -86,10 +86,13 @@ pub mod placeholder_row; pub mod projection; pub mod recursive_query; pub mod repartition; +pub mod runtime_optimizer; +pub mod runtime_rules; pub mod scalar_subquery; pub mod sort_pushdown; pub mod sorts; pub mod spill; +pub mod stage_boundary_buffer; pub mod statistics; pub mod stream; pub mod streaming; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 16b0a5ad7e4b5..0677177743166 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -349,6 +349,12 @@ impl ExecutionPlan for ProjectionExec { Some(self.metrics.clone_inner()) } + /// Passthrough: a projection doesn't change row count, so its + /// runtime row count equals its input's. + fn runtime_row_count(&self, partition: usize) -> Option { + self.input.runtime_row_count(partition) + } + fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { let input_stats = Arc::unwrap_or_clone( args.compute_child_statistics(&self.input, args.partition())?, diff --git a/datafusion/physical-plan/src/runtime_optimizer.rs b/datafusion/physical-plan/src/runtime_optimizer.rs new file mode 100644 index 0000000000000..e942c2240f1bd --- /dev/null +++ b/datafusion/physical-plan/src/runtime_optimizer.rs @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Coordinator wrapper at the root of the plan. On every poll it walks +//! its subtree to find the lowest stage whose [`StageBoundaryBuffer`]s +//! are all ready but haven't started streaming yet — the "just completed" +//! stage. When that exists, it fires each registered [`RuntimeRule`] +//! exactly once for that stage, releases the stage's boundaries, then +//! primes the next stage's boundaries against the (possibly replanned) +//! plan. Stage 0 is primed at execute() time; stages 1+ are primed +//! lazily after their predecessor releases — that's what lets a replan +//! at stage K rebuild stage-(K+1)'s input subtree and have the drain +//! task run against the new subtree. +//! +//! Each buffer owns its own AtomicWaker; RTO walks the subtree per poll +//! and registers the consumer-task waker on each. Drain tasks wake on +//! their own buffer's waker, which then wakes the consumer. Per-buffer +//! wakers decouple buffer insertion from RTO insertion at planning time: +//! `InsertStageBoundariesAtBreakers` and `InsertRuntimeOptimizer` are +//! independent optimizer rules. + +use std::collections::BTreeMap; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use arrow::array::RecordBatch; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_execution::TaskContext; +use futures::{Stream, StreamExt}; +use log::info; + +use crate::stage_boundary_buffer::StageBoundaryBuffer; +use crate::stream::RecordBatchStreamAdapter; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + SendableRecordBatchStream, +}; + +/// A runtime adaptive-execution rule. Shape is identical to +/// `datafusion_physical_optimizer::PhysicalOptimizerRule::optimize` — +/// the trait lives in `physical-plan` rather than reusing the upstream +/// trait directly only because `physical-plan` cannot depend on +/// `physical-optimizer` (the dependency runs the other way). The dual +/// shape is the migration story: any static `PhysicalOptimizerRule` +/// can be made runtime-aware by reading state from +/// `StageBoundaryBuffer`s in the plan tree it receives, and a future +/// upstream unification of the two traits requires no change to call +/// sites. +/// +/// RTO invokes `optimize` exactly once per stage-completion event with +/// its current plan; the returned plan replaces RTO's plan. Rules +/// identify the just-completed stage by walking the plan and finding +/// `StageBoundaryBuffer`s where `is_ready() && !streaming_started()`. +pub trait RuntimeRule: Send + Sync + std::fmt::Debug { + fn name(&self) -> &str; + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result>; +} + +#[derive(Debug)] +pub struct RuntimeOptimizerExec { + input: Arc, + cache: Arc, + rules: Vec>, +} + +impl RuntimeOptimizerExec { + pub fn new(input: Arc, rules: Vec>) -> Self { + let cache = Arc::clone(input.properties()); + Self { + input, + cache, + rules, + } + } +} + +impl DisplayAs for RuntimeOptimizerExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "RuntimeOptimizerExec") + } +} + +impl ExecutionPlan for RuntimeOptimizerExec { + fn name(&self) -> &'static str { + "RuntimeOptimizerExec" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new( + children.swap_remove(0), + self.rules.clone(), + ))) + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + // Prime stage 0 only. Higher stages are primed lazily by + // CoordinatorStream after their predecessor releases — that's + // what lets a replan that rebuilds stage-K's input subtree + // (e.g. swapping a nested HashJoin) take effect: the freshly + // built boundaries on the new plan are primed *after* replan, + // so the drain task runs against the post-replan subtree. + // buffer.prime() touches buffer.input.execute() (the subtree + // below the boundary) but leaves the boundary's consumer-side + // rx untouched, so HashJoin can take it later via the lazy + // execute in CoordinatorStream. + prime_buffers_at_stage(&self.input, 0, &context)?; + let schema = self.schema(); + let stream = CoordinatorStream { + child: None, + plan: Arc::clone(&self.input), + rules: self.rules.clone(), + context: Arc::clone(&context), + partition, + }; + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } +} + +/// Walks the subtree and primes every `StageBoundaryBuffer` whose +/// `stage()` matches `stage`. Other boundaries are skipped — they're +/// either already running (lower stage) or waiting their turn (higher +/// stage). `buffer.prime()` is idempotent, so calling it on an already +/// primed boundary is harmless. +fn prime_buffers_at_stage( + plan: &Arc, + stage: usize, + ctx: &Arc, +) -> Result<()> { + if let Some(buffer) = plan.downcast_ref::() + && buffer.stage() == stage + { + buffer.prime(ctx)?; + } + for child in plan.children() { + prime_buffers_at_stage(child, stage, ctx)?; + } + Ok(()) +} + +fn register_consumer_waker_on_buffers(plan: &Arc, waker: &Waker) { + if let Some(buffer) = plan.downcast_ref::() { + buffer.register_consumer_waker(waker); + } + for child in plan.children() { + register_consumer_waker_on_buffers(child, waker); + } +} + +struct CoordinatorStream { + /// Lazily created. `None` until every stage has been processed (rules + /// fired, boundaries released, next stage primed) and no boundary + /// remains gated. Then `self.plan.execute(...)` is called — all + /// `StageBoundaryBuffer` rxs are still available at that point + /// because stage-N's drain task only starts after stage-(N-1) + /// releases (lazy priming), so no operator above a boundary has + /// been executed before replan. + child: Option, + plan: Arc, + rules: Vec>, + /// Captured at execute() time; threaded into RuntimeRule::optimize so + /// rules see the session config (target_partitions, etc.) the same + /// way static `PhysicalOptimizerRule`s do, and used to lazily execute + /// the final plan. + context: Arc, + partition: usize, +} + +impl Stream for CoordinatorStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.as_mut().get_mut(); + // Register before walking so a buffer flipping is_ready *after* + // we walked but *before* we return Pending still wakes us. + register_consumer_waker_on_buffers(&this.plan, cx.waker()); + + // Find the lowest stage whose boundaries are all ready and none + // released — the "just completed" stage. Rules fire once for + // that stage, then we release. Higher stages wait their turn + // naturally because they can't drain until lower stages release. + if let Some((stage, boundaries)) = find_just_completed_stage(&this.plan) { + info!( + "RTO: stage {stage} ready ({} boundaries); firing {} rule(s) \ + before release", + boundaries.len(), + this.rules.len(), + ); + let config = this.context.session_config().options(); + let mut current_plan = Arc::clone(&this.plan); + for rule in &this.rules { + current_plan = match rule.optimize(Arc::clone(¤t_plan), config) { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(e))), + }; + } + this.plan = current_plan; + for buffer in &boundaries { + buffer + .downcast_ref::() + .expect( + "find_just_completed_stage only returns StageBoundaryBuffer Arcs", + ) + .start_streaming(); + } + info!( + "RTO: stage {stage} released; downstream consumers can now \ + drain the buffered data" + ); + // Prime the next stage in the (possibly replanned) plan now + // that stage K's data is flowing. Any boundaries rebuilt by + // the rule's transform_up are fresh — they get their drain + // tasks here, against the post-replan subtree. + if let Err(e) = prime_buffers_at_stage(&this.plan, stage + 1, &this.context) { + return Poll::Ready(Some(Err(e))); + } + } + + // Lazily execute the plan once all stages have been processed. + // While any boundary is still gated, defer execution so the + // post-replan plan can take the consumer-side rxs intact. + if this.child.is_none() { + if any_buffer_pending(&this.plan) { + return Poll::Pending; + } + match this.plan.execute(this.partition, Arc::clone(&this.context)) { + Ok(stream) => this.child = Some(stream), + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + this.child.as_mut().unwrap().poll_next_unpin(cx) + } +} + +/// True if any `StageBoundaryBuffer` in the subtree has not yet started +/// streaming. RTO uses this to gate lazy execution of `self.plan` until +/// all stages have been processed (rules run, boundaries released). +fn any_buffer_pending(plan: &Arc) -> bool { + if let Some(buffer) = plan.downcast_ref::() + && !buffer.streaming_started() + { + return true; + } + plan.children().iter().any(|c| any_buffer_pending(c)) +} + +/// Walks the plan tree, groups every `StageBoundaryBuffer` by stage, +/// and returns the lowest stage where every boundary is ready and none +/// has started streaming yet. Returns `None` if no such stage exists +/// (either nothing is ready, or every ready stage has already fired). +fn find_just_completed_stage( + plan: &Arc, +) -> Option<(usize, Vec>)> { + let mut by_stage: BTreeMap>> = BTreeMap::new(); + collect_boundaries_by_stage(plan, &mut by_stage); + by_stage.into_iter().find(|(_, bufs)| { + bufs.iter().all(|b| { + let buffer = b.downcast_ref::().expect( + "find_just_completed_stage only inserts StageBoundaryBuffer Arcs", + ); + buffer.is_ready() && !buffer.streaming_started() + }) + }) +} + +fn collect_boundaries_by_stage( + plan: &Arc, + out: &mut BTreeMap>>, +) { + if let Some(buffer) = plan.downcast_ref::() { + out.entry(buffer.stage()) + .or_default() + .push(Arc::clone(plan)); + return; + } + for child in plan.children() { + collect_boundaries_by_stage(child, out); + } +} diff --git a/datafusion/physical-plan/src/runtime_rules/mod.rs b/datafusion/physical-plan/src/runtime_rules/mod.rs new file mode 100644 index 0000000000000..fa785b31e0ebf --- /dev/null +++ b/datafusion/physical-plan/src/runtime_rules/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Concrete [`RuntimeRule`] implementations driven by +//! [`RuntimeOptimizerExec`]. One rule per file; the trait itself lives +//! in [`crate::runtime_optimizer`]. +//! +//! [`RuntimeRule`]: crate::runtime_optimizer::RuntimeRule +//! [`RuntimeOptimizerExec`]: crate::runtime_optimizer::RuntimeOptimizerExec + +pub mod swap_build_side_if_inverted; + +pub use swap_build_side_if_inverted::SwapBuildSideIfInverted; diff --git a/datafusion/physical-plan/src/runtime_rules/swap_build_side_if_inverted.rs b/datafusion/physical-plan/src/runtime_rules/swap_build_side_if_inverted.rs new file mode 100644 index 0000000000000..343fd4564c85b --- /dev/null +++ b/datafusion/physical-plan/src/runtime_rules/swap_build_side_if_inverted.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`SwapBuildSideIfInverted`] — first concrete [`RuntimeRule`]. +//! +//! When a HashJoinExec's current build side ends up larger at runtime +//! than the probe side, the static planner made the wrong choice — it +//! picked build based on (Inexact) estimates. This rule walks the plan +//! looking for joins whose children are [`StageBoundaryBuffer`]s in the +//! just-completed state (`is_ready && !streaming_started`); if `l > r` +//! it calls [`HashJoinExec::swap_inputs`] and patches the result to +//! preserve the join's distribution invariants. +//! +//! [`RuntimeRule`]: crate::runtime_optimizer::RuntimeRule + +use std::sync::Arc; + +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use log::info; + +use crate::coalesce_partitions::CoalescePartitionsExec; +use crate::joins::{HashJoinExec, PartitionMode}; +use crate::runtime_optimizer::RuntimeRule; +use crate::stage_boundary_buffer::StageBoundaryBuffer; +use crate::{ExecutionPlan, ExecutionPlanProperties}; + +#[derive(Default, Debug)] +pub struct SwapBuildSideIfInverted; + +impl SwapBuildSideIfInverted { + pub fn new() -> Self { + Self + } +} + +impl RuntimeRule for SwapBuildSideIfInverted { + fn name(&self) -> &str { + "SwapBuildSideIfInverted" + } + + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_up(|node| { + let Some(join) = node.downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + if just_completed_stage_of_join(join).is_none() { + return Ok(Transformed::no(node)); + } + let children = join.children(); + // Current HashJoinExec: LEFT child is the build side. + let left = side_runtime_rows(children[0]); + let right = side_runtime_rows(children[1]); + let (Some(l), Some(r)) = (left, right) else { + return Ok(Transformed::no(node)); + }; + if l <= r { + return Ok(Transformed::no(node)); + } + info!( + "SwapBuildSideIfInverted: flipping HashJoinExec — current \ + build (left) = {l} rows, probe (right) = {r} rows. \ + Calling swap_inputs to make the smaller side the new build." + ); + let mode = *join.partition_mode(); + let swapped = join.swap_inputs(mode)?; + let swapped = ensure_collect_left_single_partition(swapped)?; + Ok(Transformed::yes(swapped)) + }) + .map(|t| t.data) + } +} + +/// Ensures the (possibly already-coalesced) plan satisfies HashJoin's +/// CollectLeft invariant: under `PartitionMode::CollectLeft`, the left +/// child must report exactly one output partition. After +/// [`HashJoinExec::swap_inputs`], the new left side is whatever used +/// to be on the right — frequently multi-partition (e.g. behind a +/// RepartitionExec). Wrap it in [`CoalescePartitionsExec`] when needed. +/// +/// `swap_inputs` may also have wrapped the result in a `ProjectionExec` +/// to preserve output column order; in that case the HashJoinExec is +/// one level down. We walk through that via `transform_up`. +fn ensure_collect_left_single_partition( + plan: Arc, +) -> Result> { + plan.transform_up(|node| { + let Some(join) = node.downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + if *join.partition_mode() != PartitionMode::CollectLeft { + return Ok(Transformed::no(node)); + } + let children = join.children(); + if children[0].output_partitioning().partition_count() == 1 { + return Ok(Transformed::no(node)); + } + let coalesced: Arc = + Arc::new(CoalescePartitionsExec::new(Arc::clone(children[0]))); + let new_children = vec![coalesced, Arc::clone(children[1])]; + Ok(Transformed::yes(node.with_new_children(new_children)?)) + }) + .map(|t| t.data) +} + +/// Returns `Some(stage)` if `join`'s two children are both +/// [`StageBoundaryBuffer`]s at the same stage in the just-completed +/// state (`is_ready && !streaming_started`). Otherwise `None`. +fn just_completed_stage_of_join(join: &HashJoinExec) -> Option { + let children = join.children(); + if children.len() != 2 { + return None; + } + let left = children[0].downcast_ref::()?; + let right = children[1].downcast_ref::()?; + if left.stage() != right.stage() { + return None; + } + if left.is_ready() + && !left.streaming_started() + && right.is_ready() + && !right.streaming_started() + { + Some(left.stage()) + } else { + None + } +} + +/// Total runtime row count across all output partitions of a +/// HashJoin input. The input is always a `StageBoundaryBuffer` +/// (`InsertHashJoinBoundaries` inserts one above each side), and the +/// buffer materializes its input — so once `is_ready` flips, +/// `runtime_row_count(p)` returns the true post-input cardinality. +/// No static-stat fallback needed. +/// +/// Returns `None` if any partition's count is unavailable, which the +/// rule treats as "don't try to decide yet." +fn side_runtime_rows(plan: &Arc) -> Option { + let n = plan.output_partitioning().partition_count(); + let mut total: usize = 0; + for p in 0..n { + total += plan.runtime_row_count(p)?; + } + Some(total) +} diff --git a/datafusion/physical-plan/src/stage_boundary_buffer.rs b/datafusion/physical-plan/src/stage_boundary_buffer.rs new file mode 100644 index 0000000000000..6f0cf8cf3f6b6 --- /dev/null +++ b/datafusion/physical-plan/src/stage_boundary_buffer.rs @@ -0,0 +1,404 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Stage-boundary buffer between an input subtree and its consumer. The +//! boundary IS a pipeline breaker: it drains its input fully via tasks +//! spawned by [`StageBoundaryBuffer::prime`], stages the result, and +//! releases to the consumer only when +//! [`crate::runtime_optimizer::RuntimeOptimizerExec`] (RTO) decides. +//! Each boundary carries a `stage` number assigned by the optimizer at +//! insertion time so RTO and EXPLAIN can talk about stages explicitly. +//! +//! Lifecycle, separating fill from drain: +//! - `prime(ctx)` (called by RTO): spawns one drain task per input +//! partition; each pulls until EOF, ferrying batches through an +//! unbounded mpsc per partition. Idempotent. +//! - `execute(p, _)` (called by the consumer): hands back the +//! per-partition receiver wrapped in a gated stream. Never touches +//! `input.execute`; that's prime's job. +//! +//! Two flags govern release: +//! - `is_ready` (mechanical): set automatically when every drain task +//! has hit EOF. Runtime stats from the input become derivable. +//! - `streaming_started` (actual emission control): only RTO flips this, +//! via `start_streaming()`, and only as part of releasing a whole +//! stage. Once true, the gated consumer streams start forwarding from +//! their receivers. +//! +//! Coordination uses a shared [`AtomicWaker`] (`rto_waker`) populated at +//! plan time by RTO. The buffer wakes it when `is_ready` flips; the +//! coordinator is registered on it via its own `poll_next`. Side-channel +//! instead of `cx.waker()` because the latter is task-local — inside a +//! spawned task (e.g. one of `RepartitionExec`'s internals) it never +//! reaches the top-of-plan task. +//! +//! Memory: every primed buffer holds its full input until release. +//! Spill is a follow-up; OomGuard catches genuine OOM in the meantime. + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +use arrow::array::RecordBatch; +use datafusion_common::Result; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::TaskContext; +use futures::task::AtomicWaker; +use futures::{Stream, StreamExt}; +use tokio::sync::mpsc; + +use crate::statistics::StatisticsArgs; +use crate::stream::RecordBatchStreamAdapter; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + SendableRecordBatchStream, Statistics, +}; + +// TODO(spill-or-stream): the per-partition channels below currently hold +// the full materialized input in memory. Inserting a StageBoundaryBuffer +// turns a streaming subtree into a pipeline breaker — that's the cost of +// getting runtime stats. A follow-up will give each boundary TWO escape +// hatches under MemoryPool pressure: +// 1. Spill: page batches to disk and keep buffering. Stats remain +// reliable; the swap decision still happens. +// 2. Stream-through (`has_overflowed`): give up on materialization, +// release the boundary immediately so downstream consumers stream +// just like they would have before this rule existed. The flip +// rule sees `has_overflowed() == true` and skips its decision — +// we lose the adaptive optimization but the query still runs and +// we haven't made things worse than the pre-AQE baseline. +// OomGuard catches genuine OOM in the meantime. +type PartitionTxs = Vec>>>; +type PartitionRxs = Vec>>>; + +#[derive(Debug)] +pub struct StageBoundaryBuffer { + input: Arc, + cache: Arc, + /// Stage number assigned by the inserting optimizer rule. Lowest + /// stage runs first; once released, its consumers (next stage up) + /// can prime. Used by EXPLAIN, logs, and (eventually) RTO ordering. + stage: usize, + /// Each buffer owns its own AtomicWaker rather than sharing one + /// across the subtree. RTO registers the consumer-task waker on + /// this per poll cycle (cheap walk); the drain task wakes here + /// once `is_ready` flips. Per-buffer wakers decouple the buffer + /// from RTO at planning time — `InsertStageBoundariesAtBreakers` + /// and `InsertRuntimeOptimizer` no longer have to coordinate to + /// share a single waker. + rto_waker: Arc, + /// Gate flags + drain progress + consumer-side wakers. + state: Arc>, + /// Per-partition sender. Moved out (`Option::take`) into the drain + /// task at `prime()`. Independent from `rxs` because `prime` and + /// `execute` arrive in unspecified order. + txs: Arc>, + /// Per-partition receiver. Moved out into the consumer stream at + /// `execute()`. + rxs: Arc>, + /// Handles to spawned drain tasks. Auto-abort on Drop via + /// `SpawnedTask::Drop`, so query cancellation cleans up cleanly. + drain_tasks: Arc>>>, + /// Per-partition running row count, incremented by the drain task + /// as each batch flows through. The buffer materializes the input, + /// so by the time `is_ready` flips, this is the true post-input + /// cardinality — no static-stat fallback needed. + row_counts: Arc>, +} + +#[derive(Debug)] +struct BufferState { + /// Partitions whose drain task reached EOF (success or error). When + /// `drained_count == num_partitions`, `is_ready` flips. + drained_count: usize, + is_ready: bool, + /// Emission control. Only RTO flips this via `start_streaming`, + /// and only as part of releasing the entire stage this boundary + /// belongs to. + streaming_started: bool, + num_partitions: usize, + /// Wakers stashed by consumer streams while gated on `streaming_started`. + wakers: HashMap, +} + +impl StageBoundaryBuffer { + pub fn new(input: Arc, stage: usize) -> Self { + let num_partitions = input.output_partitioning().partition_count(); + let cache = Arc::clone(input.properties()); + let (txs, rxs): (Vec<_>, Vec<_>) = (0..num_partitions) + .map(|_| { + let (tx, rx) = mpsc::unbounded_channel(); + (Some(tx), Some(rx)) + }) + .unzip(); + let row_counts = + Arc::new((0..num_partitions).map(|_| AtomicUsize::new(0)).collect()); + Self { + input, + cache, + stage, + rto_waker: Arc::new(AtomicWaker::new()), + state: Arc::new(Mutex::new(BufferState { + drained_count: 0, + is_ready: false, + streaming_started: false, + num_partitions, + wakers: HashMap::new(), + })), + txs: Arc::new(Mutex::new(txs)), + rxs: Arc::new(Mutex::new(rxs)), + drain_tasks: Arc::new(Mutex::new(Vec::new())), + row_counts, + } + } + + pub fn stage(&self) -> usize { + self.stage + } + + /// Register a waker to be woken when this buffer's drain reaches EOF + /// (i.e. `is_ready` flips). RTO calls this per `poll_next` for each + /// buffer in its subtree, threading its own consumer waker through. + pub fn register_consumer_waker(&self, waker: &Waker) { + self.rto_waker.register(waker); + } + + /// Spawn drain tasks for every input partition. Each task pulls + /// `input.execute(p, ctx)` to EOF, ferrying batches and errors + /// through the per-partition channel; once EOF is reached, marks + /// the partition drained and wakes RTO if this was the last one. + /// Idempotent. Only RTO should call this. + pub fn prime(&self, ctx: &Arc) -> Result<()> { + let mut tasks = self.drain_tasks.lock().unwrap(); + if !tasks.is_empty() { + return Ok(()); + } + let mut txs = self.txs.lock().unwrap(); + for (partition, slot) in txs.iter_mut().enumerate() { + let Some(tx) = slot.take() else { + continue; + }; + let stream = self.input.execute(partition, Arc::clone(ctx))?; + let state = Arc::clone(&self.state); + let rto_waker = Arc::clone(&self.rto_waker); + let row_counts = Arc::clone(&self.row_counts); + tasks.push(SpawnedTask::spawn(drain_partition( + partition, stream, tx, state, rto_waker, row_counts, + ))); + } + Ok(()) + } + + /// True once every drain task has reached EOF. + pub fn is_ready(&self) -> bool { + self.state.lock().unwrap().is_ready + } + + /// True once `start_streaming` has been called. + pub fn streaming_started(&self) -> bool { + self.state.lock().unwrap().streaming_started + } + + /// Start actual emission: flips `streaming_started` and wakes all + /// per-partition consumer wakers. Idempotent. Only RTO should call this. + pub fn start_streaming(&self) { + let wakers = { + let mut state = self.state.lock().unwrap(); + if state.streaming_started { + return; + } + state.streaming_started = true; + state.wakers.drain().map(|(_, w)| w).collect::>() + }; + for w in wakers { + w.wake(); + } + } +} + +async fn drain_partition( + partition: usize, + mut stream: SendableRecordBatchStream, + tx: mpsc::UnboundedSender>, + state: Arc>, + rto_waker: Arc, + row_counts: Arc>, +) { + while let Some(item) = stream.next().await { + // tx.send returning Err means the consumer dropped its receiver + // (query cancelled while we were mid-drain). Stop pulling. + let rows = match &item { + Ok(batch) => batch.num_rows(), + Err(_) => 0, + }; + let is_err = item.is_err(); + if tx.send(item).is_err() { + break; + } + if rows > 0 { + row_counts[partition].fetch_add(rows, Ordering::Relaxed); + } + if is_err { + break; + } + } + let became_ready = { + let mut state = state.lock().unwrap(); + state.drained_count += 1; + if state.drained_count == state.num_partitions && !state.is_ready { + state.is_ready = true; + true + } else { + false + } + }; + if became_ready { + // cx.waker() inside this spawned task is task-local and won't + // reach the top-of-plan task. The shared AtomicWaker bridges. + rto_waker.wake(); + } +} + +impl Drop for StageBoundaryBuffer { + fn drop(&mut self) { + // SpawnedTask aborts on Drop. Clearing the Vec triggers that + // for every drain task. Safe to ignore poisoned lock — at Drop + // time we just want to release resources. + if let Ok(mut tasks) = self.drain_tasks.lock() { + tasks.clear(); + } + } +} + +impl DisplayAs for StageBoundaryBuffer { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "StageBoundaryBuffer: stage={}", self.stage) + } +} + +impl ExecutionPlan for StageBoundaryBuffer { + fn name(&self) -> &'static str { + "StageBoundaryBuffer" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + // Fresh waker is fine: with_new_children is called at planning + // time before RTO registers anything, so there's nothing to + // preserve. + Ok(Arc::new(Self::new(children.swap_remove(0), self.stage))) + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + /// Authoritative runtime cardinality: the drain task counts rows as + /// it materializes them, so once `is_ready` flips, this is the true + /// post-input count for `partition` (no static-stat fallback needed). + /// Returns `None` before `is_ready` — partial counts are not safe + /// to drive adaptive decisions with. + fn runtime_row_count(&self, partition: usize) -> Option { + if !self.is_ready() { + return None; + } + self.row_counts + .get(partition) + .map(|c| c.load(Ordering::Relaxed)) + } + + /// Passthrough: the buffer doesn't change row counts or column stats. + /// Without this override, the default impl at execution_plan.rs:528 + /// returns Statistics::new_unknown — wrapping any subtree in a + /// buffer would blackhole stats for any optimizer rule that asks. + /// (Runtime cardinality is also exposed via `runtime_row_count` as + /// the drain counts rows; this method is for other static rules + /// that inspect plan-time statistics.) + fn statistics_with_args(&self, args: &StatisticsArgs) -> Result> { + args.compute_child_statistics(&self.input, args.partition()) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + // Take ownership of this partition's receiver. prime() owns the + // sender side; we just gate-and-forward here. + let rx = self + .rxs + .lock() + .unwrap() + .get_mut(partition) + .and_then(|slot| slot.take()) + .ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "StageBoundaryBuffer::execute called twice (or partition \ + {partition} out of range)" + )) + })?; + let stream = ConsumerStream { + partition, + rx, + state: Arc::clone(&self.state), + }; + let schema = self.schema(); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } +} + +struct ConsumerStream { + partition: usize, + rx: mpsc::UnboundedReceiver>, + state: Arc>, +} + +impl Stream for ConsumerStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.as_mut().get_mut(); + // Gate: don't emit anything until RTO releases the boundary. + { + let mut state = this.state.lock().unwrap(); + if !state.streaming_started { + state.wakers.insert(this.partition, cx.waker().clone()); + return Poll::Pending; + } + } + this.rx.poll_recv(cx) + } +} diff --git a/datafusion/sqllogictest/test_files/runtime_optimizer.slt b/datafusion/sqllogictest/test_files/runtime_optimizer.slt new file mode 100644 index 0000000000000..5697a225a79aa --- /dev/null +++ b/datafusion/sqllogictest/test_files/runtime_optimizer.slt @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# =========================================================================== +# RuntimeOptimizerExec — adaptive query execution primitive. +# +# DataFusion is in-process and streaming. Adaptive optimization here should +# be in-process and streaming too. Pipeline breakers (SortExec, the build +# phase of AggregateExec and HashJoinExec) already have a natural barrier +# between their build and emit phases — they buffer their input and know +# its true row count by the time downstream starts pulling. The cost of +# that buffering is already paid; surfacing runtime stats from it to the +# planner is essentially free. +# +# `RuntimeOptimizerExec` is inserted at static plan time above pipeline +# breakers. When execution reaches its child's barrier, it reads the +# child's true runtime stats, calls the optimizer with those stats to +# rewrite its (statically-guessed) upper subplan, then executes the +# rewritten subplan. +# +# Same primitive generalizes to partition coalescing, skew handling, +# parallel window functions, and the rest of AQE. This SLT exercises the +# first concrete use case that lands the foundation. +# =========================================================================== + +# --------------------------------------------------------------------------- +# Scenario: GROUP BY on a key whose distinct cardinality the planner can't +# predict, joined against a small dimension. +# +# `big` has 100,000 rows. `group_key = id % 5` so the *actual* distinct +# cardinality is 5, but DataFusion doesn't propagate distinct_count through +# arithmetic — column stat is Absent. After GROUP BY group_key: +# actual row count: 5 +# planner estimate: Inexact, upper-bounded by input row count = 100,000 +# +# `small` is a 100-row dimension table with Exact stats. +# +# JoinSelection sees `aggregated_big` ≈ 100K (Inexact) vs `small` = 100 +# (Exact), so it picks `small` as the build side. That's exactly inverted +# from what runtime would tell us: `aggregated_big` is only 5 rows, would +# be the 20× cheaper build side. +# --------------------------------------------------------------------------- + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +CREATE TABLE big AS +SELECT + value AS id, + value % 5 AS group_key, + 1 AS payload +FROM generate_series(0, 99999); + +statement ok +CREATE TABLE small AS +SELECT value AS id, value * 10 AS payload +FROM generate_series(0, 99); + +# Sanity-check the base sizes. +query I +SELECT count(*) FROM big; +---- +100000 + +query I +SELECT count(*) FROM small; +---- +100 + +# Sanity-check that aggregated_big has only 5 actual rows even though the +# planner can't know that. +query I +SELECT count(*) FROM (SELECT group_key FROM big GROUP BY group_key) t; +---- +5 + +# --------------------------------------------------------------------------- +# EXPLAIN — shows the *static* plan with AQE primitives wrapped in +# (RuntimeOptimizerExec at root; StageBoundaryBuffer on each HashJoin +# input). JoinSelection picked `small` as build statically because +# aggregated_big's row-count estimate (Inexact, ~100K) overstates the +# actual (5). +# +# At runtime, RuntimeOptimizerExec detects this inversion and calls +# `HashJoinExec::swap_inputs` — the join below executes with build and +# probe swapped, putting the 5-row side on build. EXPLAIN stays the +# static plan; the runtime swap is observable via RUST_LOG=info, which +# emits the following ordered log lines: +# +# RTO: stage 0 ready (2 boundaries); firing 1 rule(s) before release +# SwapBuildSideIfInverted: flipping HashJoinExec — current build +# (left) = 100 rows, probe (right) = 5 rows. Calling swap_inputs ... +# RTO: stage 0 released; downstream consumers can now drain the +# buffered data +# +# That ordering proves the rule fires before the boundaries release — +# so neither side of the join has been consumed by HashJoin when the +# swap decision is made. +# --------------------------------------------------------------------------- +query TT +EXPLAIN SELECT bg.group_key, bg.sum_payload, s.payload +FROM ( + SELECT group_key, SUM(payload) AS sum_payload FROM big GROUP BY group_key +) bg +JOIN small s ON bg.group_key = s.id; +---- +logical_plan +01)Projection: bg.group_key, bg.sum_payload, s.payload +02)--Inner Join: bg.group_key = s.id +03)----SubqueryAlias: bg +04)------Projection: big.group_key, sum(big.payload) AS sum_payload +05)--------Aggregate: groupBy=[[big.group_key]], aggr=[[sum(big.payload)]] +06)----------TableScan: big projection=[group_key, payload] +07)----SubqueryAlias: s +08)------TableScan: small projection=[id, payload] +physical_plan +01)RuntimeOptimizerExec +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, group_key@0)], projection=[group_key@2, sum_payload@3, payload@1] +03)----StageBoundaryBuffer: stage=0 +04)------CoalescePartitionsExec +05)--------DataSourceExec: partitions=4, partition_sizes=[1, 0, 0, 0] +06)----StageBoundaryBuffer: stage=0 +07)------ProjectionExec: expr=[group_key@0 as group_key, sum(big.payload)@1 as sum_payload] +08)--------AggregateExec: mode=FinalPartitioned, gby=[group_key@0 as group_key], aggr=[sum(big.payload)] +09)----------RepartitionExec: partitioning=Hash([group_key@0], 4), input_partitions=4 +10)------------AggregateExec: mode=Partial, gby=[group_key@0 as group_key], aggr=[sum(big.payload)] +11)--------------DataSourceExec: partitions=4, partition_sizes=[4, 3, 3, 3] + +# --------------------------------------------------------------------------- +# Result — GREEN regardless of build-side choice (correctness is plan- +# agnostic; only performance changes). After RuntimeOptimizerExec lands, +# this stays green; only EXPLAIN moves. +# +# big rows match small ids 0..4 (the only group_keys that exist after +# aggregation). Each group has 20000 rows of payload=1, so sum_payload=20000 +# for every group. small.payload = id * 10. +# --------------------------------------------------------------------------- +query III +SELECT bg.group_key, bg.sum_payload, s.payload +FROM ( + SELECT group_key, SUM(payload) AS sum_payload FROM big GROUP BY group_key +) bg +JOIN small s ON bg.group_key = s.id +ORDER BY bg.group_key; +---- +0 20000 0 +1 20000 10 +2 20000 20 +3 20000 30 +4 20000 40