Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
7dda5b5
RuntimeOptimizerExec SLT: red baseline for hash-join build-side swap
avantgardnerio Jun 24, 2026
6c4068c
RuntimeOptimizerExec passthrough + insertion rule
avantgardnerio Jun 24, 2026
4fd62c7
PipelineBreakerBuffer passthrough + insertion above breakers
avantgardnerio Jun 24, 2026
e4f2550
PipelineBreakerBuffer: load-bearing + AtomicWaker coordinator wakeup
avantgardnerio Jun 24, 2026
7529611
RuntimeRule trait + SwapBuildSideIfInverted (log-only)
avantgardnerio Jun 24, 2026
9317cab
runtime_row_count: gate via buffer is_ready, passthrough ProjectionExec
avantgardnerio Jun 24, 2026
09a6abc
Document why pipeline_behavior() isn't suitable for breaker detection
avantgardnerio Jun 24, 2026
2154a91
PipelineBreakerBuffer: passthrough statistics_with_args
avantgardnerio Jun 25, 2026
4fe53aa
Rename PipelineBreakerBuffer to StageBoundaryBuffer, add stage field
avantgardnerio Jun 25, 2026
eec37dc
StageBoundaryBuffer: full-drain materialization + prime(ctx)
avantgardnerio Jun 25, 2026
ea64bde
Split InsertRuntimeOptimizer into two rules; per-buffer wakers
avantgardnerio Jun 25, 2026
fc4feee
InsertHashJoinBoundaries: retarget rule to HashJoin inputs
avantgardnerio Jun 25, 2026
5eb08aa
RTO: event-driven stage completion drives rule firing
avantgardnerio Jun 25, 2026
f1ae00b
RuntimeRule: reshape to PhysicalOptimizerRule signature
avantgardnerio Jun 25, 2026
e7eb5ce
Real swap: SwapBuildSideIfInverted calls swap_inputs, RTO defers execute
avantgardnerio Jun 25, 2026
3b929a9
RTO: lazy stage-by-stage priming for multi-stage replan
avantgardnerio Jun 25, 2026
3a3ec5f
Extract RuntimeRules into a runtime_rules module
avantgardnerio Jun 25, 2026
9b3decc
StageBoundaryBuffer counts its own rows; drop static-stat fallback
avantgardnerio Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-pruning = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
recursive = { workspace = true, optional = true }

Expand Down
104 changes: 104 additions & 0 deletions datafusion/physical-optimizer/src/insert_hash_join_boundaries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`InsertHashJoinBoundaries`] wraps each input of every `HashJoinExec`
//! in a [`StageBoundaryBuffer`]. Both sides are gated so that neither
//! has been consumed by the join by the time runtime stats arrive — the
//! precondition for the build-side-swap rule actually swapping (rather
//! than just logging intent). Memory cost is the size of both inputs;
//! spill is a follow-up, OomGuard catches genuine OOM in the meantime.
//!
//! Stage numbers are assigned bottom-up: a new boundary's stage is one
//! more than the highest stage of any boundary already in its input
//! subtree, or 0 if there is none. Lowest stage releases first;
//! [`RuntimeOptimizerExec`] walks the subtree at runtime and primes the
//! next stage as the previous one's boundaries become ready.
//!
//! `transform_up` ensures deeper joins are visited before outer joins,
//! so when an outer-join boundary is computed the inner-join boundaries
//! already exist and their stage numbers are visible.
//!
//! [`RuntimeOptimizerExec`]: datafusion_physical_plan::runtime_optimizer::RuntimeOptimizerExec

use std::sync::Arc;

use crate::PhysicalOptimizerRule;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::joins::HashJoinExec;
use datafusion_physical_plan::stage_boundary_buffer::StageBoundaryBuffer;

#[derive(Default, Debug)]
pub struct InsertHashJoinBoundaries;

impl InsertHashJoinBoundaries {
pub fn new() -> Self {
Self
}
}

impl PhysicalOptimizerRule for InsertHashJoinBoundaries {
fn name(&self) -> &str {
"InsertHashJoinBoundaries"
}

fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let with_buffers = plan
.transform_up(|node| {
if node.downcast_ref::<HashJoinExec>().is_none() {
return Ok(Transformed::no(node));
}
let mut new_children: Vec<Arc<dyn ExecutionPlan>> =
Vec::with_capacity(node.children().len());
for child in node.children() {
if child.downcast_ref::<StageBoundaryBuffer>().is_some() {
new_children.push(Arc::clone(child));
continue;
}
let stage =
max_buffer_stage_in_subtree(child).map_or(0, |max| max + 1);
new_children.push(Arc::new(StageBoundaryBuffer::new(
Arc::clone(child),
stage,
)));
}
Ok(Transformed::yes(node.with_new_children(new_children)?))
})?
.data;
Ok(with_buffers)
}

fn schema_check(&self) -> bool {
true
}
}

fn max_buffer_stage_in_subtree(plan: &Arc<dyn ExecutionPlan>) -> Option<usize> {
let mut max_stage = plan.downcast_ref::<StageBoundaryBuffer>().map(|b| b.stage());
for child in plan.children() {
if let Some(child_max) = max_buffer_stage_in_subtree(child) {
max_stage = Some(max_stage.map_or(child_max, |m| m.max(child_max)));
}
}
max_stage
}
2 changes: 2 additions & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ pub mod output_requirements;
pub mod projection_pushdown;
pub use datafusion_pruning as pruning;
pub mod hash_join_buffering;
pub mod insert_hash_join_boundaries;
pub mod pushdown_sort;
pub mod runtime_optimizer;
pub mod sanity_checker;
pub mod topk_aggregation;
pub mod topk_repartition;
Expand Down
15 changes: 15 additions & 0 deletions datafusion/physical-optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ use crate::topk_repartition::TopKRepartition;
use crate::update_aggr_exprs::OptimizeAggregateOrder;

use crate::hash_join_buffering::HashJoinBuffering;
use crate::insert_hash_join_boundaries::InsertHashJoinBoundaries;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
use crate::pushdown_sort::PushdownSort;
use crate::runtime_optimizer::InsertRuntimeOptimizer;
use crate::window_topn::WindowTopN;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -247,6 +249,19 @@ impl PhysicalOptimizer {
// given query plan; i.e. it only acts as a final
// gatekeeping rule.
Arc::new(SanityCheckPlan::new()),
// Adaptive-execution infrastructure. Two rules, in order:
// 1. InsertHashJoinBoundaries — wraps each HashJoin input
// in a StageBoundaryBuffer with a bottom-up stage
// number; the boundaries are where runtime stats become
// observable and where the build/probe sides are gated
// until a swap decision can be made.
// 2. InsertRuntimeOptimizer — wraps the plan root in a
// RuntimeOptimizerExec that walks the subtree at runtime
// to release ready buffers and run RuntimeRules.
// Split so future adaptive rules can add their own targeted
// boundary-insertion rules without touching the RTO wrapper.
Arc::new(InsertHashJoinBoundaries::new()),
Arc::new(InsertRuntimeOptimizer::new()),
];

Self::with_rules(rules)
Expand Down
66 changes: 66 additions & 0 deletions datafusion/physical-optimizer/src/runtime_optimizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`InsertRuntimeOptimizer`] wraps the (now-final) plan root in a
//! [`RuntimeOptimizerExec`] with the default set of runtime rules. It
//! does nothing else — buffer insertion happens in a separate, targeted
//! rule (today: [`InsertStageBoundariesAtBreakers`]). The split lets
//! future adaptive optimizations (partition coalescing, skew handling)
//! introduce their own targeted insertion rules without touching the
//! RTO-wrapping logic.

use std::sync::Arc;

use crate::PhysicalOptimizerRule;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::runtime_optimizer::{RuntimeOptimizerExec, RuntimeRule};
use datafusion_physical_plan::runtime_rules::SwapBuildSideIfInverted;

#[derive(Default, Debug)]
pub struct InsertRuntimeOptimizer;

impl InsertRuntimeOptimizer {
pub fn new() -> Self {
Self
}
}

impl PhysicalOptimizerRule for InsertRuntimeOptimizer {
fn name(&self) -> &str {
"InsertRuntimeOptimizer"
}

fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// Don't re-wrap if we've already been inserted (multi-pass loops).
if plan.downcast_ref::<RuntimeOptimizerExec>().is_some() {
return Ok(plan);
}
let rules: Vec<Arc<dyn RuntimeRule>> =
vec![Arc::new(SwapBuildSideIfInverted::new())];
Ok(Arc::new(RuntimeOptimizerExec::new(plan, rules)))
}

fn schema_check(&self) -> bool {
true
}
}
30 changes: 29 additions & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, PushedDownPredicate,
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::metrics::{ExecutionPlanMetricsSet, MetricValue, MetricsSet};
use crate::statistics::StatisticsArgs;
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
Expand Down Expand Up @@ -1765,6 +1765,34 @@ impl ExecutionPlan for AggregateExec {
Some(self.metrics.clone_inner())
}

/// Reads the `OutputRows` metric for the given partition. Returns
/// `None` until that partition has emitted at least one batch.
///
/// For `Final` / `FinalPartitioned` modes the build phase produces
/// all groups in a single batch per output partition, so once any
/// output has been pulled the count equals the true post-build
/// cardinality for that partition. That's what downstream adaptive
/// rules consume.
///
/// For `Partial` modes the count is also per-partition truth —
/// each input partition independently knows how many partial-group
/// rows it has produced.
fn runtime_row_count(&self, partition: usize) -> Option<usize> {
// Find the OutputRows metric registered for this partition.
// Returning `Some(0)` for an empty partition is correct — None
// means "this partition hasn't been started yet" (the metric
// doesn't exist).
self.metrics
.clone_inner()
.iter()
.find_map(|m| match (m.partition(), m.value()) {
(Some(p), MetricValue::OutputRows(count)) if p == partition => {
Some(count.value())
}
_ => None,
})
}

fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
let child_statistics =
args.compute_child_statistics(&self.input, args.partition())?;
Expand Down
23 changes: 23 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,29 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
None
}

/// Returns the number of rows this operator **will emit** on the given
/// output partition, if that count is knowable at the moment of the
/// call.
///
/// Pipeline-breaking operators that absorb all input before emitting
/// (`AggregateExec`, `SortExec` once sorted, the build side of
/// `HashJoinExec` once built) know their true per-partition output
/// cardinality the moment their build phase completes — *before* any
/// output batch has been pulled. That is precisely the runtime stat
/// downstream adaptive-execution rules (build-side swap, partition
/// coalescing, skew handling) need.
///
/// Callers that want a cross-partition total sum across all
/// `partition` indices.
///
/// Streaming operators never have a meaningful answer and should
/// leave this as the default `None`. Pipeline-breaking operators
/// should return `None` *before* their barrier completes and
/// `Some(rows)` once it has.
fn runtime_row_count(&self, _partition: usize) -> Option<usize> {
None
}

/// Returns statistics for a specific partition of this `ExecutionPlan` node.
///
/// Deprecated: use [`Self::statistics_with_args`] instead,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ pub mod placeholder_row;
pub mod projection;
pub mod recursive_query;
pub mod repartition;
pub mod runtime_optimizer;
pub mod runtime_rules;
pub mod scalar_subquery;
pub mod sort_pushdown;
pub mod sorts;
pub mod spill;
pub mod stage_boundary_buffer;
pub mod statistics;
pub mod stream;
pub mod streaming;
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ impl ExecutionPlan for ProjectionExec {
Some(self.metrics.clone_inner())
}

/// Passthrough: a projection doesn't change row count, so its
/// runtime row count equals its input's.
fn runtime_row_count(&self, partition: usize) -> Option<usize> {
self.input.runtime_row_count(partition)
}

fn statistics_with_args(&self, args: &StatisticsArgs) -> Result<Arc<Statistics>> {
let input_stats = Arc::unwrap_or_clone(
args.compute_child_statistics(&self.input, args.partition())?,
Expand Down
Loading
Loading