From 4b414133b0821d9d870f42b5c165bd783a80ae64 Mon Sep 17 00:00:00 2001 From: Crystal Zhou Date: Thu, 23 Oct 2025 01:11:59 -0400 Subject: [PATCH 1/4] Update hashjoin to identify right probe side (ref: #17518) --- .../physical-plan/src/joins/hash_join/exec.rs | 202 ++++++++++++++++-- .../src/joins/hash_join/shared_bounds.rs | 79 ++++++- 2 files changed, 254 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b5fe5ee5cda14..2a7bc201efd3f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -463,12 +463,25 @@ impl HashJoinExec { }) } - fn create_dynamic_filter(on: &JoinOn) -> Arc { - // Extract the right-side keys (probe side keys) from the `on` clauses - // Dynamic filter will be created from build side values (left side) and applied to probe side (right side) - let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + fn join_exprs_for_side(on: &JoinOn, pushdown_side: JoinSide) -> Vec { + match pushdown_side { + JoinSide::Left => on.iter().map(|(l, _)| Arc::clone(l)).collect(), + JoinSide::Right => on.iter().map(|(_, r)| Arc::clone(r)).collect(), + JoinSide::None => return vec![], + } + } + + fn create_dynamic_filter( + on: &JoinOn, + pushdown_side: JoinSide, + ) -> Result> { + if pushdown_side == JoinSide::None { + return internal_err!("dynamic filter side must be specified"); + } + // Extract the join key expressions from the side that will receive the dynamic filter + let keys = Self::join_exprs_for_side(on, pushdown_side); // Initialize with a placeholder expression (true) that will be updated when the hash table is built - Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + Ok(Arc::new(DynamicFilterPhysicalExpr::new(keys, lit(true)))) } /// left (build) side which gets hashed @@ -780,6 +793,21 @@ impl DisplayAs for HashJoinExec { } } +fn find_filter_pushdown_sides(join_type: JoinType) -> JoinSide { + match join_type { + JoinType::Inner => JoinSide::Right, + JoinType::Left => JoinSide::Right, + JoinType::Right => JoinSide::Left, + JoinType::Full => JoinSide::None, + JoinType::LeftSemi => JoinSide::Right, + JoinType::RightSemi => JoinSide::Left, + JoinType::LeftAnti => JoinSide::Right, + JoinType::RightAnti => JoinSide::Left, + JoinType::LeftMark => JoinSide::Right, + JoinType::RightMark => JoinSide::Left, + } +} + impl ExecutionPlan for HashJoinExec { fn name(&self) -> &'static str { "HashJoinExec" @@ -929,8 +957,10 @@ impl ExecutionPlan for HashJoinExec { } let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); + // let filter_pushdown_side = find_filter_pushdown_sides(self.join_type); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); + let probe_side = find_filter_pushdown_sides(self.join_type); let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { let left_stream = self.left.execute(0, Arc::clone(&context))?; @@ -946,7 +976,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), - enable_dynamic_filter_pushdown, + enable_dynamic_filter_pushdown && probe_side == JoinSide::Right, )) })?, PartitionMode::Partitioned => { @@ -964,7 +994,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, - enable_dynamic_filter_pushdown, + enable_dynamic_filter_pushdown && probe_side == JoinSide::Right, )) } PartitionMode::Auto => { @@ -982,18 +1012,22 @@ impl ExecutionPlan for HashJoinExec { .then(|| { self.dynamic_filter.as_ref().map(|df| { let filter = Arc::clone(&df.filter); - let on_right = self - .on - .iter() - .map(|(_, right_expr)| Arc::clone(right_expr)) - .collect::>(); + // Determine which side will receive the dynamic filter + let probe_side = find_filter_pushdown_sides(self.join_type); + // Bounds should be collected from the build side (opposite of probe side) + // let build_side = match probe_side { + // JoinSide::Left => JoinSide::Right, + // JoinSide::Right => JoinSide::Left, + // JoinSide::None => JoinSide::None, + // }; + let on_expressions = Self::join_exprs_for_side(&self.on, probe_side); Some(Arc::clone(df.bounds_accumulator.get_or_init(|| { Arc::new(SharedBoundsAccumulator::new_from_partition_mode( self.mode, self.left.as_ref(), self.right.as_ref(), filter, - on_right, + on_expressions, )) }))) }) @@ -1126,7 +1160,7 @@ impl ExecutionPlan for HashJoinExec { } // Get basic filter descriptions for both children - let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + let mut left_child = crate::filter_pushdown::ChildFilterDescription::from_child( &parent_filters, self.left(), )?; @@ -1139,9 +1173,24 @@ impl ExecutionPlan for HashJoinExec { if matches!(phase, FilterPushdownPhase::Post) && config.optimizer.enable_join_dynamic_filter_pushdown { - // Add actual dynamic filter to right side (probe side) - let dynamic_filter = Self::create_dynamic_filter(&self.on); - right_child = right_child.with_self_filter(dynamic_filter); + let pushdown_side = find_filter_pushdown_sides(self.join_type); + let dynamic_filter = Self::create_dynamic_filter(&self.on, pushdown_side)?; + match pushdown_side { + JoinSide::None => { + // A join type that preserves both sides (e.g. FULL) cannot + // leverage dynamic filters. Return early before attempting to + // create one. + return Ok(FilterDescription::new() + .with_child(left_child) + .with_child(right_child)); + } + JoinSide::Left => { + left_child = left_child.with_self_filter(dynamic_filter); + } + JoinSide::Right => { + right_child = right_child.with_self_filter(dynamic_filter); + } + } } Ok(FilterDescription::new() @@ -1159,7 +1208,8 @@ impl ExecutionPlan for HashJoinExec { // non-inner joins in `gather_filters_for_pushdown`. // However it's a cheap check and serves to inform future devs touching this function that they need to be really // careful pushing down filters through non-inner joins. - if self.join_type != JoinType::Inner { + let pushdown_side = find_filter_pushdown_sides(self.join_type); + if pushdown_side == JoinSide::None { // Other types of joins can support *some* filters, but restrictions are complex and error prone. // For now we don't support them. // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs @@ -1170,9 +1220,13 @@ impl ExecutionPlan for HashJoinExec { let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children - let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child + let self_filters = match pushdown_side { + JoinSide::Left => &child_pushdown_result.self_filters[0], + JoinSide::Right => &child_pushdown_result.self_filters[1], + JoinSide::None => unreachable!(), + }; // We expect 0 or 1 self filters - if let Some(filter) = right_child_self_filters.first() { + if let Some(filter) = self_filters.first() { // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating let predicate = Arc::clone(&filter.predicate); @@ -4518,4 +4572,112 @@ mod tests { fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() } + + #[test] + fn create_dynamic_filter_none_side_returns_error() { + let on: JoinOn = vec![]; + let err = HashJoinExec::create_dynamic_filter(&on, JoinSide::None).unwrap_err(); + assert_contains!(err.to_string(), "dynamic filter side must be specified"); + } + + #[test] + fn full_join_skips_dynamic_filter_creation() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::col; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1]))], + )?; + let left = + TestMemoryExec::try_new(&[vec![batch.clone()]], Arc::clone(&schema), None)?; + let right = TestMemoryExec::try_new(&[vec![batch]], Arc::clone(&schema), None)?; + + let on = vec![(col("a", &left.schema())?, col("a", &right.schema())?)]; + let join = HashJoinExec::try_new( + Arc::new(left), + Arc::new(right), + on, + None, + &JoinType::Full, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?; + + let mut config = ConfigOptions::default(); + config.optimizer.enable_dynamic_filter_pushdown = true; + + let desc = + join.gather_filters_for_pushdown(FilterPushdownPhase::Post, vec![], &config)?; + assert!(desc.self_filters().iter().all(|f| f.is_empty())); + Ok(()) + } + + // This test verifies that when a HashJoinExec is created with a dynamic filter + // targeting the left side, the join build phase collects min/max bounds from + // the build-side input and reports them back into the dynamic filter for the + // other side. Concretely: + // - Left input has values [1, 3, 5] + // - Right (build) input has values [2, 4, 6] + // - JoinType::Right is used so that the right side acts as the build side + // and the dynamic filter is attached to the left side expression. + // - After fully executing the join, the dynamic filter should be updated + // with the observed bounds `a@0 >= 2 AND a@0 <= 6` (min=2, max=6). + // The test asserts that HashJoinExec correctly accumulates and reports these + // bounds so downstream consumers can use the dynamic predicate for pruning. + #[tokio::test] + async fn reports_bounds_when_dynamic_filter_side_left() -> Result<()> { + use datafusion_physical_expr::expressions::col; + + let task_ctx = Arc::new(TaskContext::default()); + + let left_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let left_batch = RecordBatch::try_new( + Arc::clone(&left_schema), + vec![Arc::new(Int32Array::from(vec![1, 3, 5]))], + )?; + let left = TestMemoryExec::try_new(&[vec![left_batch]], left_schema, None)?; + + let right_schema = + Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)])); + let right_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![Arc::new(Int32Array::from(vec![2, 4, 6]))], + )?; + let right = TestMemoryExec::try_new(&[vec![right_batch]], right_schema, None)?; + + let on = vec![(col("a", &left.schema())?, col("b", &right.schema())?)]; + + let mut join = HashJoinExec::try_new( + Arc::new(left), + Arc::new(right), + on, + None, + &JoinType::Right, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?; + + let dynamic_filter: Arc = + HashJoinExec::create_dynamic_filter(&join.on, JoinSide::Left)?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: Arc::clone(&dynamic_filter), + bounds_accumulator: OnceLock::new(), + }); + + let stream = join.execute(0, task_ctx)?; + let _batches: Vec = stream.try_collect().await?; + + assert_eq!( + format!("{}", dynamic_filter.current().unwrap()), + "a@0 >= 2 AND a@0 <= 6" + ); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 25f7a0de31acd..04720128ca1fa 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -107,8 +107,8 @@ pub(crate) struct SharedBoundsAccumulator { barrier: Barrier, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, - /// Right side join expressions needed for creating filter bounds - on_right: Vec, + /// Join expressions for the side that will receive the dynamic filter + on_expressions: Vec, } /// State protected by SharedBoundsAccumulator's mutex @@ -149,7 +149,7 @@ impl SharedBoundsAccumulator { left_child: &dyn ExecutionPlan, right_child: &dyn ExecutionPlan, dynamic_filter: Arc, - on_right: Vec, + on_expressions: Vec, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -171,7 +171,7 @@ impl SharedBoundsAccumulator { }), barrier: Barrier::new(expected_calls), dynamic_filter, - on_right, + on_expressions, } } @@ -199,16 +199,16 @@ impl SharedBoundsAccumulator { // Create range predicates for each join key in this partition let mut column_predicates = Vec::with_capacity(partition_bounds.len()); - for (col_idx, right_expr) in self.on_right.iter().enumerate() { + for (col_idx, expr) in self.on_expressions.iter().enumerate() { if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { // Create predicate: col >= min AND col <= max let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), + Arc::clone(expr), Operator::GtEq, lit(column_bounds.min.clone()), )) as Arc; let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), + Arc::clone(expr), Operator::LtEq, lit(column_bounds.max.clone()), )) as Arc; @@ -311,3 +311,68 @@ impl fmt::Debug for SharedBoundsAccumulator { write!(f, "SharedBoundsAccumulator") } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::empty::EmptyExec; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common_runtime::SpawnedTask; + use datafusion_physical_expr::expressions::{col, lit, DynamicFilterPhysicalExpr}; + use tokio::task; + + // This test verifies the synchronization behavior of `SharedBoundsAccumulator`. + // It ensures that the dynamic filter is not updated until all expected + // partitions have reported their build-side bounds. One partition reports + // in a spawned task while the test reports another; the dynamic filter + // should remain the default until the final partition arrives, at which + // point the accumulated bounds are combined and the dynamic filter is + // updated exactly once with range predicates (>= and <=) for the join key. + #[tokio::test] + async fn waits_for_all_partitions_before_updating() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + let left = EmptyExec::new(Arc::clone(&schema)).with_partitions(2); + let right = EmptyExec::new(Arc::clone(&schema)).with_partitions(2); + let col_expr = col("a", &schema).unwrap(); + let dynamic = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_expr)], + lit(true), + )); + let acc = Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + PartitionMode::Partitioned, + &left, + &right, + Arc::clone(&dynamic), + vec![Arc::clone(&col_expr)], + )); + + assert_eq!(format!("{}", dynamic.current().unwrap()), "true"); + + let acc0 = Arc::clone(&acc); + let handle = SpawnedTask::spawn(async move { + acc0.report_partition_bounds( + 0, + Some(vec![ColumnBounds::new( + ScalarValue::from(1i32), + ScalarValue::from(2i32), + )]), + ) + .await + .unwrap(); + }); + task::yield_now().await; + assert_eq!(format!("{}", dynamic.current().unwrap()), "true"); + acc.report_partition_bounds( + 1, + Some(vec![ColumnBounds::new( + ScalarValue::from(3i32), + ScalarValue::from(4i32), + )]), + ) + .await + .unwrap(); + handle.await.unwrap(); + let updated = format!("{}", dynamic.current().unwrap()); + assert!(updated.contains(">=") && updated.contains("<=")); + } +} \ No newline at end of file From b13bf71bbd805b933d1685b48a9bad267bee3127 Mon Sep 17 00:00:00 2001 From: Crystal Zhou Date: Fri, 24 Oct 2025 00:39:41 -0400 Subject: [PATCH 2/4] Applied changes in #17518 with minor refactoring --- .../physical-plan/src/joins/hash_join/exec.rs | 105 +++++++----- .../src/joins/hash_join/shared_bounds.rs | 2 +- .../src/joins/hash_join/stream.rs | 150 +++++++++++++++--- 3 files changed, 189 insertions(+), 68 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 2a7bc201efd3f..478251969cd3a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -29,6 +29,7 @@ use crate::filter_pushdown::{ use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator}; use crate::joins::hash_join::stream::{ BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState, + ProbeSideBoundsAccumulator, }; use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64}; use crate::joins::utils::{ @@ -540,12 +541,6 @@ impl HashJoinExec { ] } - /// Get probe side information for the hash join. - pub fn probe_side() -> JoinSide { - // In current implementation right side is always probe side. - JoinSide::Right - } - /// Return whether the join contains a projection pub fn contains_projection(&self) -> bool { self.projection.is_some() @@ -591,7 +586,7 @@ impl HashJoinExec { &join_type, Arc::clone(&schema), &Self::maintains_input_order(join_type), - Some(Self::probe_side()), + Some(find_filter_pushdown_sides(join_type)), on, )?; @@ -793,6 +788,7 @@ impl DisplayAs for HashJoinExec { } } +// TODO(crystal): double check if this is correct fn find_filter_pushdown_sides(join_type: JoinType) -> JoinSide { match join_type { JoinType::Inner => JoinSide::Right, @@ -957,10 +953,11 @@ impl ExecutionPlan for HashJoinExec { } let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); - // let filter_pushdown_side = find_filter_pushdown_sides(self.join_type); + let probe_side = find_filter_pushdown_sides(self.join_type); + let report_build_bounds = + enable_dynamic_filter_pushdown && probe_side == JoinSide::Right; let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); - let probe_side = find_filter_pushdown_sides(self.join_type); let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { let left_stream = self.left.execute(0, Arc::clone(&context))?; @@ -976,7 +973,9 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), - enable_dynamic_filter_pushdown && probe_side == JoinSide::Right, + report_build_bounds, + // TODO(crystal): why do we need to use this AND? this looks very sketchy + // Can we extend it to right as well? )) })?, PartitionMode::Partitioned => { @@ -994,7 +993,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, - enable_dynamic_filter_pushdown && probe_side == JoinSide::Right, + report_build_bounds, )) } PartitionMode::Auto => { @@ -1014,12 +1013,7 @@ impl ExecutionPlan for HashJoinExec { let filter = Arc::clone(&df.filter); // Determine which side will receive the dynamic filter let probe_side = find_filter_pushdown_sides(self.join_type); - // Bounds should be collected from the build side (opposite of probe side) - // let build_side = match probe_side { - // JoinSide::Left => JoinSide::Right, - // JoinSide::Right => JoinSide::Left, - // JoinSide::None => JoinSide::None, - // }; + // TODO(crystal): maybe rename on_expressions to something that makes more sense, such as probe_side_expressions? let on_expressions = Self::join_exprs_for_side(&self.on, probe_side); Some(Arc::clone(df.bounds_accumulator.get_or_init(|| { Arc::new(SharedBoundsAccumulator::new_from_partition_mode( @@ -1038,6 +1032,7 @@ impl ExecutionPlan for HashJoinExec { // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; + let right_schema = right_stream.schema(); // update column indices to reflect the projection let column_indices_after_projection = match &self.projection { @@ -1054,6 +1049,23 @@ impl ExecutionPlan for HashJoinExec { .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); + let probe_bounds_accumulators = + if enable_dynamic_filter_pushdown && probe_side == JoinSide::Left { + Some( + on_right + .iter() + .map(|expr| { + ProbeSideBoundsAccumulator::try_new( + Arc::clone(expr), + &right_schema, + ) + }) + .collect::>>()?, + ) + } else { + None + }; + Ok(Box::pin(HashJoinStream::new( partition, self.schema(), @@ -1071,6 +1083,8 @@ impl ExecutionPlan for HashJoinExec { vec![], self.right.output_ordering().is_some(), bounds_accumulator, + report_build_bounds, + probe_bounds_accumulators, self.mode, ))) } @@ -1225,7 +1239,7 @@ impl ExecutionPlan for HashJoinExec { JoinSide::Right => &child_pushdown_result.self_filters[1], JoinSide::None => unreachable!(), }; - // We expect 0 or 1 self filters + // We expect 0 or 1 self filters if let Some(filter) = self_filters.first() { // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating @@ -4621,9 +4635,8 @@ mod tests { // the build-side input and reports them back into the dynamic filter for the // other side. Concretely: // - Left input has values [1, 3, 5] - // - Right (build) input has values [2, 4, 6] - // - JoinType::Right is used so that the right side acts as the build side - // and the dynamic filter is attached to the left side expression. + // - Right (probe) input has values [2, 4, 6] + // - JoinType::Right is used so that the dynamic filter is attached to the left side expression. // - After fully executing the join, the dynamic filter should be updated // with the observed bounds `a@0 >= 2 AND a@0 <= 6` (min=2, max=6). // The test asserts that HashJoinExec correctly accumulates and reports these @@ -4652,31 +4665,37 @@ mod tests { let on = vec![(col("a", &left.schema())?, col("b", &right.schema())?)]; - let mut join = HashJoinExec::try_new( - Arc::new(left), - Arc::new(right), - on, - None, - &JoinType::Right, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNull, - )?; + let test_cases = vec![ + (JoinType::Right, JoinSide::Left, "a@0 >= 2 AND a@0 <= 6"), + (JoinType::Left, JoinSide::Right, "b@0 >= 1 AND b@0 <= 5"), + ]; + for (join_type, probe_side, expected_filter) in test_cases { + let mut join_exec = HashJoinExec::try_new( + Arc::new(left.clone()), + Arc::new(right.clone()), + on.clone(), + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?; - let dynamic_filter: Arc = - HashJoinExec::create_dynamic_filter(&join.on, JoinSide::Left)?; - join.dynamic_filter = Some(HashJoinExecDynamicFilter { - filter: Arc::clone(&dynamic_filter), - bounds_accumulator: OnceLock::new(), - }); + let dynamic_filter: Arc = + HashJoinExec::create_dynamic_filter(&join_exec.on, probe_side)?; + join_exec.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: Arc::clone(&dynamic_filter), + bounds_accumulator: OnceLock::new(), + }); - let stream = join.execute(0, task_ctx)?; - let _batches: Vec = stream.try_collect().await?; + let stream = join_exec.execute(0, task_ctx.clone())?; + let _batches: Vec = stream.try_collect().await?; - assert_eq!( - format!("{}", dynamic_filter.current().unwrap()), - "a@0 >= 2 AND a@0 <= 6" - ); + assert_eq!( + format!("{}", dynamic_filter.current().unwrap()), + expected_filter + ); + } Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 04720128ca1fa..b442fd7190779 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -375,4 +375,4 @@ mod tests { let updated = format!("{}", dynamic.current().unwrap()); assert!(updated.contains(">=") && updated.contains("<=")); } -} \ No newline at end of file +} diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 88c50c2eb2cee..d895def06b812 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::task::Poll; use crate::joins::hash_join::exec::JoinLeftData; -use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator; +use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator}; use crate::joins::utils::{ equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut, }; @@ -43,11 +43,13 @@ use crate::{ }; use arrow::array::{ArrayRef, UInt32Array, UInt64Array}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{ internal_datafusion_err, internal_err, JoinSide, JoinType, NullEquality, Result, }; +use datafusion_expr::Accumulator; +use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::PhysicalExprRef; use ahash::RandomState; @@ -102,6 +104,56 @@ impl BuildSide { } } +/// Accumulates probe-side column bounds for dynamic filter pushdown. +/// +/// This mirrors the build-side accumulator used when collecting bounds from +/// the left (build) side. Each accumulator tracks the minimum and maximum +/// values for a single join key expression. +pub(super) struct ProbeSideBoundsAccumulator { + expr: PhysicalExprRef, + min: MinAccumulator, + max: MaxAccumulator, +} + +impl ProbeSideBoundsAccumulator { + /// Creates a new accumulator for the given join key expression. + pub(super) fn try_new(expr: PhysicalExprRef, schema: &SchemaRef) -> Result { + fn dictionary_value_type(data_type: &DataType) -> DataType { + match data_type { + DataType::Dictionary(_, value_type) => { + dictionary_value_type(value_type.as_ref()) + } + _ => data_type.clone(), + } + } + + let data_type = expr + .data_type(schema) + .map(|dt| dictionary_value_type(&dt))?; + Ok(Self { + expr, + min: MinAccumulator::try_new(&data_type)?, + max: MaxAccumulator::try_new(&data_type)?, + }) + } + + /// Updates bounds using values from the provided batch. + fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> { + let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; + self.min.update_batch(std::slice::from_ref(&array))?; + self.max.update_batch(std::slice::from_ref(&array))?; + Ok(()) + } + + /// Returns the final column bounds. + fn evaluate(mut self) -> Result { + Ok(ColumnBounds::new( + self.min.evaluate()?, + self.max.evaluate()?, + )) + } +} + /// Represents state of HashJoinStream /// /// Expected state transitions performed by HashJoinStream are: @@ -182,9 +234,9 @@ pub(super) struct HashJoinStream { schema: Arc, /// equijoin columns from the right (probe side) on_right: Vec, - /// optional join filter + /// Optional join filter filter: Option, - /// type of the join (left, right, semi, etc) + /// Type of the join (left, right, semi, etc) join_type: JoinType, /// right (probe) input right: SendableRecordBatchStream, @@ -211,7 +263,12 @@ pub(super) struct HashJoinStream { /// Optional future to signal when bounds have been reported by all partitions /// and the dynamic filter has been updated bounds_waiter: Option>, - + /// Whether we should report bounds derived from the build (left) side + report_build_bounds: bool, + /// Accumulators for probe-side bounds when filtering the left side + probe_bounds_accumulators: Option>, + /// Total number of probe-side rows processed (for bounds reporting) + probe_side_row_count: usize, /// Partitioning mode to use mode: PartitionMode, } @@ -316,6 +373,8 @@ impl HashJoinStream { hashes_buffer: Vec, right_side_ordered: bool, bounds_accumulator: Option>, + report_build_bounds: bool, + probe_bounds_accumulators: Option>, mode: PartitionMode, ) -> Self { Self { @@ -336,6 +395,9 @@ impl HashJoinStream { right_side_ordered, bounds_accumulator, bounds_waiter: None, + report_build_bounds, + probe_bounds_accumulators, + probe_side_row_count: 0, mode, } } @@ -390,6 +452,18 @@ impl HashJoinStream { Poll::Ready(Ok(StatefulStreamResult::Continue)) } + fn create_bounds_waiter( + bounds_accumulator: Arc, + partition_id: usize, + bounds: Option>, + ) -> OnceFut<()> { + OnceFut::new(async move { + bounds_accumulator + .report_partition_bounds(partition_id, bounds) + .await + }) + } + /// Collects build-side data by polling `OnceFut` future from initialized build-side /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` @@ -410,25 +484,26 @@ impl HashJoinStream { // // Dynamic filter coordination between partitions: // Report bounds to the accumulator which will handle synchronization and filter updates + let mut next_state = HashJoinStreamState::FetchProbeBatch; if let Some(ref bounds_accumulator) = self.bounds_accumulator { - let bounds_accumulator = Arc::clone(bounds_accumulator); - - let left_side_partition_id = match self.mode { - PartitionMode::Partitioned => self.partition, - PartitionMode::CollectLeft => 0, - PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), - }; - - let left_data_bounds = left_data.bounds.clone(); - self.bounds_waiter = Some(OnceFut::new(async move { - bounds_accumulator - .report_partition_bounds(left_side_partition_id, left_data_bounds) - .await - })); - self.state = HashJoinStreamState::WaitPartitionBoundsReport; - } else { - self.state = HashJoinStreamState::FetchProbeBatch; + if self.report_build_bounds { + let partition_id = match self.mode { + PartitionMode::Partitioned => self.partition, + PartitionMode::CollectLeft => 0, + PartitionMode::Auto => unreachable!( + "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" + ), + }; + let bounds = left_data.bounds.clone(); + self.bounds_waiter = Some(Self::create_bounds_waiter( + Arc::clone(bounds_accumulator), + partition_id, + bounds, + )); + next_state = HashJoinStreamState::WaitPartitionBoundsReport; + } } + self.state = next_state; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue)) @@ -444,7 +519,27 @@ impl HashJoinStream { ) -> Poll>>> { match ready!(self.right.poll_next_unpin(cx)) { None => { - self.state = HashJoinStreamState::ExhaustedProbeSide; + let mut next_state = HashJoinStreamState::ExhaustedProbeSide; + if let Some(ref bounds_accumulator) = self.bounds_accumulator { + if let Some(accs) = self.probe_bounds_accumulators.take() { + let bounds = if self.probe_side_row_count > 0 { + Some( + accs.into_iter() + .map(|acc| acc.evaluate()) + .collect::>>()?, + ) + } else { + None + }; + self.bounds_waiter = Some(Self::create_bounds_waiter( + Arc::clone(bounds_accumulator), + self.partition, + bounds, + )); + next_state = HashJoinStreamState::WaitPartitionBoundsReport; + } + } + self.state = next_state; } Some(Ok(batch)) => { // Precalculate hash values for fetched batch @@ -454,6 +549,13 @@ impl HashJoinStream { .map(|c| c.evaluate(&batch)?.into_array(batch.num_rows())) .collect::>>()?; + if let Some(accumulators) = self.probe_bounds_accumulators.as_mut() { + for acc in accumulators.iter_mut() { + acc.update_batch(&batch)?; + } + self.probe_side_row_count += batch.num_rows(); + } + self.hashes_buffer.clear(); self.hashes_buffer.resize(batch.num_rows(), 0); create_hashes(&keys_values, &self.random_state, &mut self.hashes_buffer)?; @@ -637,7 +739,7 @@ impl HashJoinStream { let (left_side, right_side) = get_final_indices_from_shared_bitmap( build_side.left_data.visited_indices_bitmap(), self.join_type, - true, + false, // piecewise = false for regular hash join ); let empty_right_batch = RecordBatch::new_empty(self.right.schema()); // use the left and right indices to produce the batch result From 6d26c886d2c2684f80203a8eb22da5967a5640e8 Mon Sep 17 00:00:00 2001 From: Crystal Zhou Date: Fri, 24 Oct 2025 00:45:51 -0400 Subject: [PATCH 3/4] Fix style --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 478251969cd3a..bfc38f4d1af4f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -468,7 +468,7 @@ impl HashJoinExec { match pushdown_side { JoinSide::Left => on.iter().map(|(l, _)| Arc::clone(l)).collect(), JoinSide::Right => on.iter().map(|(_, r)| Arc::clone(r)).collect(), - JoinSide::None => return vec![], + JoinSide::None => vec![], } } @@ -974,8 +974,6 @@ impl ExecutionPlan for HashJoinExec { need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), report_build_bounds, - // TODO(crystal): why do we need to use this AND? this looks very sketchy - // Can we extend it to right as well? )) })?, PartitionMode::Partitioned => { @@ -1013,7 +1011,6 @@ impl ExecutionPlan for HashJoinExec { let filter = Arc::clone(&df.filter); // Determine which side will receive the dynamic filter let probe_side = find_filter_pushdown_sides(self.join_type); - // TODO(crystal): maybe rename on_expressions to something that makes more sense, such as probe_side_expressions? let on_expressions = Self::join_exprs_for_side(&self.on, probe_side); Some(Arc::clone(df.bounds_accumulator.get_or_init(|| { Arc::new(SharedBoundsAccumulator::new_from_partition_mode( @@ -4688,7 +4685,7 @@ mod tests { bounds_accumulator: OnceLock::new(), }); - let stream = join_exec.execute(0, task_ctx.clone())?; + let stream = join_exec.execute(0, Arc::clone(&task_ctx))?; let _batches: Vec = stream.try_collect().await?; assert_eq!( From cbcc8b0834e4688bf2dda34a7628cd09b570271d Mon Sep 17 00:00:00 2001 From: Crystal Zhou Date: Thu, 6 Nov 2025 01:49:12 -0500 Subject: [PATCH 4/4] Check filter pushdown sides for join types --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index bfc38f4d1af4f..7d6eb69323596 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -788,19 +788,23 @@ impl DisplayAs for HashJoinExec { } } -// TODO(crystal): double check if this is correct fn find_filter_pushdown_sides(join_type: JoinType) -> JoinSide { + // This represents the side that will receive the dynamic filter and apply the bounds. + // The other side will be the build side where we collect the bounds from. + // Bounds accumulator only collect join key range from ON clause. match join_type { JoinType::Inner => JoinSide::Right, JoinType::Left => JoinSide::Right, JoinType::Right => JoinSide::Left, - JoinType::Full => JoinSide::None, JoinType::LeftSemi => JoinSide::Right, JoinType::RightSemi => JoinSide::Left, JoinType::LeftAnti => JoinSide::Right, JoinType::RightAnti => JoinSide::Left, JoinType::LeftMark => JoinSide::Right, JoinType::RightMark => JoinSide::Left, + // Full outer join cannot have dynamic filter pushdown because all rows on both + // sides are preserved. + JoinType::Full => JoinSide::None, } }