-
Notifications
You must be signed in to change notification settings - Fork 2k
Refactor state management in HashJoinExec and use CASE expressions for more precise filters
#18451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
252ec10
8955399
3952704
a7b6902
e548273
9b22adf
a8c4ce2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { | |
| - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] | ||
| - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ] | ||
| " | ||
| ); | ||
| } | ||
|
|
@@ -1305,7 +1305,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { | |
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - CoalesceBatchesExec: target_batch_size=8192 | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ELSE false END ] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The case statement just covers partition 2 and 4 because those where the only ones that had data right? would it be worth adding a test on what's expected when all partitions are empty?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep will add a test |
||
| " | ||
| ); | ||
|
|
||
|
|
@@ -1322,7 +1322,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { | |
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true | ||
| - CoalesceBatchesExec: target_batch_size=8192 | ||
| - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ] | ||
| " | ||
| ); | ||
|
|
||
|
|
@@ -1667,8 +1667,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { | |
| - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true | ||
| - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ] | ||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab ELSE false END ] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this is already tested but can we add a test where the build side has NULLs, I guess we should get something like:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think nulls end up factoring in: nulls never get included in an inner join (the only case for which this optimizations is active now). If we want to support e.g. left joins I agree we'll have to modify the generated filter for those cases to be what you are suggesting.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's right! |
||
| - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb ELSE false END ] | ||
| " | ||
| ); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,9 @@ use crate::filter_pushdown::{ | |
| ChildPushdownResult, FilterDescription, FilterPushdownPhase, | ||
| FilterPushdownPropagation, | ||
| }; | ||
| use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator}; | ||
| use crate::joins::hash_join::shared_bounds::{ | ||
| ColumnBounds, PartitionBounds, SharedBuildAccumulator, | ||
| }; | ||
| use crate::joins::hash_join::stream::{ | ||
| BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState, | ||
| }; | ||
|
|
@@ -40,6 +42,7 @@ use crate::projection::{ | |
| try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, | ||
| ProjectionExec, | ||
| }; | ||
| use crate::repartition::REPARTITION_HASH_SEED; | ||
| use crate::spill::get_record_batch_memory_size; | ||
| use crate::ExecutionPlanProperties; | ||
| use crate::{ | ||
|
|
@@ -88,7 +91,8 @@ const HASH_JOIN_SEED: RandomState = | |
| /// HashTable and input data for the left (build side) of a join | ||
| pub(super) struct JoinLeftData { | ||
| /// The hash table with indices into `batch` | ||
| pub(super) hash_map: Box<dyn JoinHashMapType>, | ||
| /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown | ||
| pub(super) hash_map: Arc<dyn JoinHashMapType>, | ||
| /// The input rows for the build side | ||
| batch: RecordBatch, | ||
| /// The build side on expressions values | ||
|
|
@@ -103,32 +107,13 @@ pub(super) struct JoinLeftData { | |
| /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption. | ||
| /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle. | ||
| _reservation: MemoryReservation, | ||
| /// Bounds computed from the build side for dynamic filter pushdown | ||
| pub(super) bounds: Option<Vec<ColumnBounds>>, | ||
| /// Bounds computed from the build side for dynamic filter pushdown. | ||
| /// If the partition is empty (no rows) this will be None. | ||
| /// If the partition has some rows this will be Some with the bounds for each join key column. | ||
| pub(super) bounds: Option<PartitionBounds>, | ||
| } | ||
|
|
||
| impl JoinLeftData { | ||
| /// Create a new `JoinLeftData` from its parts | ||
| pub(super) fn new( | ||
| hash_map: Box<dyn JoinHashMapType>, | ||
| batch: RecordBatch, | ||
| values: Vec<ArrayRef>, | ||
| visited_indices_bitmap: SharedBitmapBuilder, | ||
| probe_threads_counter: AtomicUsize, | ||
| reservation: MemoryReservation, | ||
| bounds: Option<Vec<ColumnBounds>>, | ||
| ) -> Self { | ||
| Self { | ||
| hash_map, | ||
| batch, | ||
| values, | ||
| visited_indices_bitmap, | ||
| probe_threads_counter, | ||
| _reservation: reservation, | ||
| bounds, | ||
| } | ||
| } | ||
|
|
||
| /// return a reference to the hash map | ||
| pub(super) fn hash_map(&self) -> &dyn JoinHashMapType { | ||
| &*self.hash_map | ||
|
|
@@ -363,9 +348,9 @@ pub struct HashJoinExec { | |
| struct HashJoinExecDynamicFilter { | ||
| /// Dynamic filter that we'll update with the results of the build side once that is done. | ||
| filter: Arc<DynamicFilterPhysicalExpr>, | ||
| /// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition. | ||
| /// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition. | ||
| /// It is lazily initialized during execution to make sure we use the actual execution time partition counts. | ||
| bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>, | ||
| build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>, | ||
| } | ||
|
|
||
| impl fmt::Debug for HashJoinExec { | ||
|
|
@@ -976,8 +961,15 @@ impl ExecutionPlan for HashJoinExec { | |
|
|
||
| let batch_size = context.session_config().batch_size(); | ||
|
|
||
| // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled) | ||
| let bounds_accumulator = enable_dynamic_filter_pushdown | ||
| // Initialize build_accumulator lazily with runtime partition counts (only if enabled) | ||
| // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing | ||
| let repartition_random_state = RandomState::with_seeds( | ||
| REPARTITION_HASH_SEED[0], | ||
| REPARTITION_HASH_SEED[1], | ||
| REPARTITION_HASH_SEED[2], | ||
| REPARTITION_HASH_SEED[3], | ||
| ); | ||
| let build_accumulator = enable_dynamic_filter_pushdown | ||
| .then(|| { | ||
| self.dynamic_filter.as_ref().map(|df| { | ||
| let filter = Arc::clone(&df.filter); | ||
|
|
@@ -986,13 +978,14 @@ impl ExecutionPlan for HashJoinExec { | |
| .iter() | ||
| .map(|(_, right_expr)| Arc::clone(right_expr)) | ||
| .collect::<Vec<_>>(); | ||
| Some(Arc::clone(df.bounds_accumulator.get_or_init(|| { | ||
| Arc::new(SharedBoundsAccumulator::new_from_partition_mode( | ||
| Some(Arc::clone(df.build_accumulator.get_or_init(|| { | ||
| Arc::new(SharedBuildAccumulator::new_from_partition_mode( | ||
| self.mode, | ||
| self.left.as_ref(), | ||
| self.right.as_ref(), | ||
| filter, | ||
| on_right, | ||
| repartition_random_state, | ||
| )) | ||
| }))) | ||
| }) | ||
|
|
@@ -1035,7 +1028,7 @@ impl ExecutionPlan for HashJoinExec { | |
| batch_size, | ||
| vec![], | ||
| self.right.output_ordering().is_some(), | ||
| bounds_accumulator, | ||
| build_accumulator, | ||
| self.mode, | ||
| ))) | ||
| } | ||
|
|
@@ -1196,7 +1189,7 @@ impl ExecutionPlan for HashJoinExec { | |
| cache: self.cache.clone(), | ||
| dynamic_filter: Some(HashJoinExecDynamicFilter { | ||
| filter: dynamic_filter, | ||
| bounds_accumulator: OnceLock::new(), | ||
| build_accumulator: OnceLock::new(), | ||
| }), | ||
| }); | ||
| result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>); | ||
|
|
@@ -1345,7 +1338,7 @@ impl BuildSideState { | |
| /// When `should_compute_bounds` is true, this function computes the min/max bounds | ||
|
||
| /// for each join key column but does NOT update the dynamic filter. Instead, the | ||
| /// bounds are stored in the returned `JoinLeftData` and later coordinated by | ||
| /// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds | ||
| /// `SharedBuildAccumulator` to ensure all partitions contribute their bounds | ||
| /// before updating the filter exactly once. | ||
| /// | ||
| /// # Returns | ||
|
|
@@ -1416,6 +1409,7 @@ async fn collect_left_input( | |
|
|
||
| // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the | ||
| // `u64` indice variant | ||
| // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown | ||
adriangb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize { | ||
| let estimated_hashtable_size = | ||
| estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; | ||
|
|
@@ -1451,22 +1445,22 @@ async fn collect_left_input( | |
| offset += batch.num_rows(); | ||
| } | ||
| // Merge all batches into a single batch, so we can directly index into the arrays | ||
| let single_batch = concat_batches(&schema, batches_iter)?; | ||
| let batch = concat_batches(&schema, batches_iter)?; | ||
|
|
||
| // Reserve additional memory for visited indices bitmap and create shared builder | ||
| let visited_indices_bitmap = if with_visited_indices_bitmap { | ||
| let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); | ||
| let bitmap_size = bit_util::ceil(batch.num_rows(), 8); | ||
| reservation.try_grow(bitmap_size)?; | ||
| metrics.build_mem_used.add(bitmap_size); | ||
|
|
||
| let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows()); | ||
| let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows()); | ||
| bitmap_buffer.append_n(num_rows, false); | ||
| bitmap_buffer | ||
| } else { | ||
| BooleanBufferBuilder::new(0) | ||
| }; | ||
|
|
||
| let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?; | ||
| let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; | ||
|
|
||
| // Compute bounds for dynamic filter if enabled | ||
| let bounds = match bounds_accumulators { | ||
|
|
@@ -1475,20 +1469,23 @@ async fn collect_left_input( | |
| .into_iter() | ||
| .map(CollectLeftAccumulator::evaluate) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| Some(bounds) | ||
| Some(PartitionBounds::new(bounds)) | ||
| } | ||
| _ => None, | ||
| }; | ||
|
|
||
| let data = JoinLeftData::new( | ||
| hashmap, | ||
| single_batch, | ||
| left_values.clone(), | ||
| Mutex::new(visited_indices_bitmap), | ||
| AtomicUsize::new(probe_threads_count), | ||
| reservation, | ||
| // Convert Box to Arc for sharing with SharedBuildAccumulator | ||
| let hash_map: Arc<dyn JoinHashMapType> = hashmap.into(); | ||
|
|
||
| let data = JoinLeftData { | ||
| hash_map, | ||
| batch, | ||
| values: left_values, | ||
| visited_indices_bitmap: Mutex::new(visited_indices_bitmap), | ||
| probe_threads_counter: AtomicUsize::new(probe_threads_count), | ||
| _reservation: reservation, | ||
| bounds, | ||
| ); | ||
| }; | ||
|
|
||
| Ok(data) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,5 +20,6 @@ | |
| pub use exec::HashJoinExec; | ||
|
|
||
| mod exec; | ||
| mod partitioned_hash_eval; | ||
| mod shared_bounds; | ||
| mod stream; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that even though there is only 1
WHENclause we keep theCASEstatement because there are 2 partitions: anything that hadhash_repartition % 1 = 1has no data in the build side -> can immediately be discarded by thefalsefall through condition.