Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7e894be
Dynamic Filter and Join Handling Refactor Summary
kosiew Sep 10, 2025
37c95a4
Dynamic Filter Pushdown Enhancements
kosiew Sep 10, 2025
02f2d00
Refactor Dynamic Filter Handling in `HashJoinExec`
kosiew Sep 11, 2025
24d2fea
Refactor dynamic filter side determination in `JoinType` for clarity …
kosiew Sep 11, 2025
5116377
Enhance dynamic filter handling in HashJoinExec to support left and r…
kosiew Sep 11, 2025
1f4a413
Remove unnecessary JoinType imports from dynamic filter pushdown tests
kosiew Sep 11, 2025
b3b7a30
Refactor dynamic filter tests to use a helper function for scan creation
kosiew Sep 11, 2025
190270d
Refactor dynamic filter pushdown tests: remove unused CoalescePartiti…
kosiew Sep 11, 2025
459abf4
Enhance dynamic filter pushdown logic in HashJoinExec and SharedBound…
kosiew Sep 11, 2025
36d1a61
Refactor dynamic filter handling in HashJoinExec: enforce specified j…
kosiew Sep 11, 2025
1c40186
Implement `preserves` method in `JoinType` to enhance join side prese…
kosiew Sep 11, 2025
581cbcd
Add test for dynamic filter pushdown in partitioned HashJoinExec
kosiew Sep 11, 2025
12ac262
Add tests for dynamic filter pushdown in right and left mark joins in…
kosiew Sep 11, 2025
50e8e17
Revert "Add tests for dynamic filter pushdown in right and left mark …
kosiew Sep 11, 2025
7dcdfdc
Add LEFT_PRESERVING and RIGHT_PRESERVING constants for join type pres…
kosiew Sep 11, 2025
f744fc3
Add preservation checks for LeftAnti and RightAnti join types
kosiew Sep 11, 2025
5755a1a
Enhance documentation for dynamic_filter_side to clarify behavior wit…
kosiew Sep 11, 2025
500a32b
Refactor imports in filter_pushdown tests for clarity and consistency
kosiew Sep 11, 2025
9764539
Remove unused dynamic filter references from HashJoinExec and HashJoi…
kosiew Sep 11, 2025
d9384f8
Refactor dynamic filter expression handling in HashJoinExec for clari…
kosiew Sep 11, 2025
29c6d63
Enhance dynamic filter pushdown in HashJoinExec by adding CoalescePar…
kosiew Sep 11, 2025
d2722c1
Restore main test_hashjoin_dynamic_filter_pushdown_partitioned
kosiew Sep 11, 2025
2bf7dc4
Add test for nested hash join dynamic filter pushdown
kosiew Sep 11, 2025
4d14bc1
Add comments for tests
kosiew Sep 11, 2025
b4f9701
Implement dynamic filter pushdown support for HashJoinExec
kosiew Sep 11, 2025
411a459
Refactor filter pushdown tests to utilize helper functions for buildi…
kosiew Sep 11, 2025
81145aa
Enhance dynamic filter selection logic for JoinType to accurately ref…
kosiew Sep 11, 2025
434ad72
Refactor dynamic_filter_side logic to improve join type handling and …
kosiew Sep 11, 2025
688ba1f
Fix dynamic filter creation to return an error for unspecified join s…
kosiew Sep 11, 2025
b814c22
Clarify documentation for dynamic filter pushdown eligibility in Join…
kosiew Sep 11, 2025
5c499d8
Add probe-side bounds accumulation for dynamic filter pushdown in Has…
kosiew Sep 11, 2025
c353d5e
Add test for dynamic filter bounds accumulation in HashJoinExec
kosiew Sep 11, 2025
5a51266
Add comment to explain waits_for_all_partitions_before_updating
kosiew Sep 11, 2025
ea03b6d
Implement dynamic filter handling for FULL joins and add test for ski…
kosiew Sep 11, 2025
b6dc5ea
Fix import path for MaxAccumulator and MinAccumulator in HashJoinStream
kosiew Sep 11, 2025
0782214
Merge branch 'main' into df-join-metadata-16973
kosiew Sep 11, 2025
61c33b5
Fix clippy error
kosiew Sep 11, 2025
fcca244
Clarify comment on dynamic filter pushdown eligibility in JoinType im…
kosiew Sep 11, 2025
f5f1fc3
Fix clippy error
kosiew Sep 11, 2025
39390be
Refactor TestMemoryExec initialization to use schema.clone() for clarity
kosiew Sep 12, 2025
6cf11d5
Fix clippy error
kosiew Sep 13, 2025
5cfb95c
Merge branch 'main' into df-join-metadata-16973
kosiew Sep 18, 2025
d67f5db
Add dynamic filter side preference tests for join types
kosiew Sep 18, 2025
238a57e
Refactor dynamic filter side logic in join types and update related t…
kosiew Sep 18, 2025
990d940
Clarify dynamic filter behavior for semi/anti joins in pushdown logic
kosiew Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix dynamic filter creation to return an error for unspecified join s…
…ide and update related logic
  • Loading branch information
kosiew committed Sep 11, 2025
commit 688ba1f7b79fd2f95e6663243b16d6527d123bea
77 changes: 42 additions & 35 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ use arrow_schema::DataType;
use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide, JoinType,
NullEquality, Result,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -474,15 +475,14 @@ impl HashJoinExec {
fn create_dynamic_filter(
on: &JoinOn,
side: JoinSide,
) -> Arc<DynamicFilterPhysicalExpr> {
assert!(
side != JoinSide::None,
"dynamic filter side must be specified"
);
) -> Result<Arc<DynamicFilterPhysicalExpr>> {
if side == JoinSide::None {
return internal_err!("dynamic filter side must be specified");
}
// Extract the join key expressions from the side that will receive the dynamic filter
let keys = Self::join_exprs_for_side(on, side);
// Initialize with a placeholder expression (true) that will be updated when the hash table is built
Arc::new(DynamicFilterPhysicalExpr::new(keys, lit(true)))
Ok(Arc::new(DynamicFilterPhysicalExpr::new(keys, lit(true))))
}

/// left (build) side which gets hashed
Expand Down Expand Up @@ -1166,12 +1166,12 @@ impl HashJoinExec {
match df_side {
JoinSide::Left => {
let dynamic_filter =
Self::create_dynamic_filter(&self.on, JoinSide::Left);
Self::create_dynamic_filter(&self.on, JoinSide::Left)?;
left_child = left_child.with_self_filter(dynamic_filter);
}
JoinSide::Right => {
let dynamic_filter =
Self::create_dynamic_filter(&self.on, JoinSide::Right);
Self::create_dynamic_filter(&self.on, JoinSide::Right)?;
right_child = right_child.with_self_filter(dynamic_filter);
}
JoinSide::None => {}
Expand Down Expand Up @@ -1214,32 +1214,32 @@ impl HashJoinExec {
// Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
// "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
let predicate = Arc::clone(&filter.predicate);
if let Ok(dynamic_filter) =
Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
{
// We successfully pushed down our self filter - we need to make a new node with the dynamic filter
let new_node = Arc::new(HashJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
}
let dynamic_filter = Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
.map_err(|_| {
internal_datafusion_err!("expected DynamicFilterPhysicalExpr")
})?;
// We successfully pushed down our self filter - we need to make a new node with the dynamic filter
let new_node = Arc::new(HashJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
}
Ok(result)
}
Expand Down Expand Up @@ -1631,6 +1631,13 @@ mod tests {
)
}

#[test]
fn create_dynamic_filter_none_side_returns_error() {
let on: JoinOn = vec![];
let err = HashJoinExec::create_dynamic_filter(&on, JoinSide::None).unwrap_err();
assert_contains!(err.to_string(), "dynamic filter side must be specified");
}

async fn join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
Expand Down