From 18e8ad8c09fe00e9af31697a93bb804c5874aa79 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 17 Jun 2025 21:06:49 -0400 Subject: [PATCH 1/3] feat: Support `u32` indices for `HashJoinExec` --- .../physical-plan/src/joins/hash_join.rs | 123 ++++- .../physical-plan/src/joins/join_hash_map.rs | 513 ++++++++++-------- .../src/joins/stream_join_utils.rs | 45 +- .../src/joins/symmetric_hash_join.rs | 6 +- datafusion/physical-plan/src/joins/utils.rs | 2 +- 5 files changed, 432 insertions(+), 257 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 5034a199e2364..5e5e950286eb4 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -34,6 +34,7 @@ use super::{ }; use super::{JoinOn, JoinOnRef}; use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, ProjectionExec, @@ -50,7 +51,7 @@ use crate::{ build_batch_from_indices, build_join_schema, check_join_is_valid, estimate_join_statistics, need_produce_result_in_final, symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, - JoinFilter, JoinHashMap, JoinHashMapType, StatefulStreamResult, + JoinFilter, JoinHashMapType, StatefulStreamResult, }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, @@ -93,7 +94,7 @@ const HASH_JOIN_SEED: RandomState = /// HashTable and input data for the left (build side) of a join struct JoinLeftData { /// The hash table with indices into `batch` - hash_map: JoinHashMap, + hash_map: Box, /// The input rows for the build side batch: RecordBatch, /// The build side on expressions values @@ -113,7 +114,7 @@ struct JoinLeftData { impl JoinLeftData { /// Create a new `JoinLeftData` from its parts fn new( - hash_map: JoinHashMap, + hash_map: Box, batch: RecordBatch, values: Vec, visited_indices_bitmap: SharedBitmapBuilder, @@ -131,8 +132,8 @@ impl JoinLeftData { } /// return a reference to the hash map - fn hash_map(&self) -> &JoinHashMap { - &self.hash_map + fn hash_map(&self) -> &dyn JoinHashMapType { + &*self.hash_map } /// returns a reference to the build side batch @@ -983,15 +984,26 @@ async fn collect_left_input( .await?; // Estimation of memory size, required for hashtable, prior to allocation. - // Final result can be verified using `RawTable.allocation_info()` - let fixed_size = size_of::(); - let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?; - - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); + // Final result can be verifiedJoinHashMapTypele.allocation_info()` + let fixed_size_u32 = size_of::(); + let fixed_size_u64 = size_of::(); + + // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the + // `u64` indice variant + let mut hashmap: Box = if num_rows > u32::MAX as usize { + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU64::with_capacity(num_rows)) + } else { + let estimated_hashtable_size = + estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU32::with_capacity(num_rows)) + }; - let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; @@ -1003,7 +1015,7 @@ async fn collect_left_input( update_hash( &on_left, batch, - &mut hashmap, + &mut *hashmap, offset, &random_state, &mut hashes_buffer, @@ -1055,19 +1067,16 @@ async fn collect_left_input( /// which allows to keep either first (if set to true) or last (if set to false) row index /// as a chain head for rows with equal hash values. #[allow(clippy::too_many_arguments)] -pub fn update_hash( +pub fn update_hash( on: &[PhysicalExprRef], batch: &RecordBatch, - hash_map: &mut T, + hash_map: &mut dyn JoinHashMapType, offset: usize, random_state: &RandomState, hashes_buffer: &mut Vec, deleted_offset: usize, fifo_hashmap: bool, -) -> Result<()> -where - T: JoinHashMapType, -{ +) -> Result<()> { // evaluate the keys let keys_values = on .iter() @@ -1087,9 +1096,9 @@ where .map(|(i, val)| (i + offset, val)); if fifo_hashmap { - hash_map.update_from_iter(hash_values_iter.rev(), deleted_offset); + hash_map.update_from_iter(Box::new(hash_values_iter.rev()), deleted_offset); } else { - hash_map.update_from_iter(hash_values_iter, deleted_offset); + hash_map.update_from_iter(Box::new(hash_values_iter), deleted_offset); } Ok(()) @@ -1301,7 +1310,7 @@ impl RecordBatchStream for HashJoinStream { /// ``` #[allow(clippy::too_many_arguments)] fn lookup_join_hashmap( - build_hashmap: &JoinHashMap, + build_hashmap: &dyn JoinHashMapType, build_side_values: &[ArrayRef], probe_side_values: &[ArrayRef], null_equals_null: bool, @@ -3435,7 +3444,7 @@ mod tests { } #[test] - fn join_with_hash_collision() -> Result<()> { + fn join_with_hash_collisions_u64() -> Result<()> { let mut hashmap_left = HashTable::with_capacity(4); let left = build_table_i32( ("a", &vec![10, 20]), @@ -3472,7 +3481,7 @@ mod tests { // Join key column for both join sides let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _; - let join_hash_map = JoinHashMap::new(hashmap_left, next); + let join_hash_map = JoinHashMapU64::new(hashmap_left, next); let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?; let right_keys_values = @@ -3505,6 +3514,70 @@ mod tests { Ok(()) } + #[test] + fn join_with_hash_collision_u32() -> Result<()> { + let mut hashmap_left = HashTable::with_capacity(4); + let left = build_table_i32( + ("a", &vec![10, 20]), + ("x", &vec![100, 200]), + ("y", &vec![200, 300]), + ); + + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let hashes_buff = &mut vec![0; left.num_rows()]; + let hashes = create_hashes( + &[Arc::clone(&left.columns()[0])], + &random_state, + hashes_buff, + )?; + + hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h); + hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h); + hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h); + hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h); + + let next: Vec = vec![2, 0]; + + let right = build_table_i32( + ("a", &vec![10, 20]), + ("b", &vec![0, 0]), + ("c", &vec![30, 40]), + ); + + let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _; + + let join_hash_map = JoinHashMapU32::new(hashmap_left, next); + + let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?; + let right_keys_values = + key_column.evaluate(&right)?.into_array(right.num_rows())?; + let mut hashes_buffer = vec![0; right.num_rows()]; + create_hashes( + &[Arc::clone(&right_keys_values)], + &random_state, + &mut hashes_buffer, + )?; + + let (l, r, _) = lookup_join_hashmap( + &join_hash_map, + &[left_keys_values], + &[right_keys_values], + false, + &hashes_buffer, + 8192, + (0, None), + )?; + + // We still expect to match rows 0 and 1 on both sides + let left_ids: UInt64Array = vec![0, 1].into(); + let right_ids: UInt32Array = vec![0, 1].into(); + + assert_eq!(left_ids, l); + assert_eq!(right_ids, r); + + Ok(()) + } + #[tokio::test] async fn join_with_duplicated_column_names() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index 521e19d7bf444..b3efaed83bdd9 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -20,7 +20,7 @@ //! ["on" values] to a list of indices with this key's value. use std::fmt::{self, Debug}; -use std::ops::IndexMut; +use std::ops::Sub; use hashbrown::hash_table::Entry::{Occupied, Vacant}; use hashbrown::HashTable; @@ -35,7 +35,7 @@ use hashbrown::HashTable; /// During this stage it might be the case that a row is contained the same hashmap value, /// but the values don't match. Those are checked in the `equal_rows_arr` method. /// -/// The indices (values) are stored in a separate chained list stored in the `Vec`. +/// The indices (values) are stored in a separate chained list stored as `Vec` `Vec`. /// /// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. /// @@ -87,27 +87,156 @@ use hashbrown::HashTable; /// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1) /// --------------------- /// ``` -pub struct JoinHashMap { +/// +/// Here we have an option between creating a `JoinHashMapType` using `u32` or `u64` indices +/// based on how many rows were being used for indices. +pub trait JoinHashMapType: Send + Sync { + fn extend_zero(&mut self, len: usize); + + fn update_from_iter<'a>( + &mut self, + iter: Box + Send + 'a>, + deleted_offset: usize, + ); + + fn get_matched_indices<'a>( + &self, + iter: Box + 'a>, + deleted_offset: Option, + ) -> (Vec, Vec); + + fn get_matched_indices_with_limit_offset( + &self, + hash_values: &[u64], + limit: usize, + offset: JoinHashMapOffset, + ) -> (Vec, Vec, Option); +} + +pub struct JoinHashMapU32 { + // Stores hash value to last row index + map: HashTable<(u64, u32)>, + // Stores indices in chained list data structure + next: Vec, +} + +impl JoinHashMapU32 { + #[cfg(test)] + pub(crate) fn new(map: HashTable<(u64, u32)>, next: Vec) -> Self { + Self { map, next } + } + + pub fn with_capacity(cap: usize) -> Self { + Self { + map: HashTable::with_capacity(cap), + next: vec![0; cap], + } + } +} + +impl Debug for JoinHashMapU32 { + fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { + Ok(()) + } +} + +impl JoinHashMapType for JoinHashMapU32 { + fn extend_zero(&mut self, _: usize) {} + + fn update_from_iter<'a>( + &mut self, + iter: Box + Send + 'a>, + deleted_offset: usize, + ) { + update_from_iter::(&mut self.map, &mut self.next, iter, deleted_offset); + } + + fn get_matched_indices<'a>( + &self, + iter: Box + 'a>, + deleted_offset: Option, + ) -> (Vec, Vec) { + get_matched_indices::(&self.map, &self.next, iter, deleted_offset) + } + + fn get_matched_indices_with_limit_offset( + &self, + hash_values: &[u64], + limit: usize, + offset: JoinHashMapOffset, + ) -> (Vec, Vec, Option) { + get_matched_indices_with_limit_offset::( + &self.map, + &self.next, + hash_values, + limit, + offset, + ) + } +} + +pub struct JoinHashMapU64 { // Stores hash value to last row index map: HashTable<(u64, u64)>, // Stores indices in chained list data structure next: Vec, } -impl JoinHashMap { +impl JoinHashMapU64 { #[cfg(test)] pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec) -> Self { Self { map, next } } - pub(crate) fn with_capacity(capacity: usize) -> Self { - JoinHashMap { - map: HashTable::with_capacity(capacity), - next: vec![0; capacity], + pub fn with_capacity(cap: usize) -> Self { + Self { + map: HashTable::with_capacity(cap), + next: vec![0; cap], } } } +impl Debug for JoinHashMapU64 { + fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { + Ok(()) + } +} + +impl JoinHashMapType for JoinHashMapU64 { + fn extend_zero(&mut self, _: usize) {} + + fn update_from_iter<'a>( + &mut self, + iter: Box + Send + 'a>, + deleted_offset: usize, + ) { + update_from_iter::(&mut self.map, &mut self.next, iter, deleted_offset); + } + + fn get_matched_indices<'a>( + &self, + iter: Box + 'a>, + deleted_offset: Option, + ) -> (Vec, Vec) { + get_matched_indices::(&self.map, &self.next, iter, deleted_offset) + } + + fn get_matched_indices_with_limit_offset( + &self, + hash_values: &[u64], + limit: usize, + offset: JoinHashMapOffset, + ) -> (Vec, Vec, Option) { + get_matched_indices_with_limit_offset::( + &self.map, + &self.next, + hash_values, + limit, + offset, + ) + } +} + // Type of offsets for obtaining indices from JoinHashMap. pub(crate) type JoinHashMapOffset = (usize, Option); @@ -115,250 +244,198 @@ pub(crate) type JoinHashMapOffset = (usize, Option); // Early returns in case of reaching output tuples limit. macro_rules! chain_traverse { ( - $input_indices:ident, $match_indices:ident, $hash_values:ident, $next_chain:ident, - $input_idx:ident, $chain_idx:ident, $remaining_output:ident - ) => { - let mut match_row_idx = $chain_idx - 1; + $input_indices:ident, $match_indices:ident, + $hash_values:ident, $next_chain:ident, + $input_idx:ident, $chain_idx:ident, $remaining_output:ident, $one:ident, $zero:ident + ) => {{ + // now `one` and `zero` are in scope from the outer function + let mut match_row_idx = $chain_idx - $one; loop { - $match_indices.push(match_row_idx); + $match_indices.push(match_row_idx.into()); $input_indices.push($input_idx as u32); $remaining_output -= 1; - // Follow the chain to get the next index value - let next = $next_chain[match_row_idx as usize]; + + let next = $next_chain[match_row_idx.into() as usize]; if $remaining_output == 0 { - // In case current input index is the last, and no more chain values left - // returning None as whole input has been scanned - let next_offset = if $input_idx == $hash_values.len() - 1 && next == 0 { + // we compare against `zero` (of type T) here too + let next_offset = if $input_idx == $hash_values.len() - 1 && next == $zero + { None } else { - Some(($input_idx, Some(next))) + Some(($input_idx, Some(next.into()))) }; return ($input_indices, $match_indices, next_offset); } - if next == 0 { - // end of list + if next == $zero { break; } - match_row_idx = next - 1; + match_row_idx = next - $one; } - }; + }}; } -// Trait defining methods that must be implemented by a hash map type to be used for joins. -pub trait JoinHashMapType { - /// The type of list used to store the next list - type NextType: IndexMut; - /// Extend with zero - fn extend_zero(&mut self, len: usize); - /// Returns mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType); - /// Returns a reference to the hash map. - fn get_map(&self) -> &HashTable<(u64, u64)>; - /// Returns a reference to the next. - fn get_list(&self) -> &Self::NextType; - - // Whether values in the hashmap are distinct (no duplicate keys) - fn is_distinct(&self) -> bool { - false - } - - /// Updates hashmap from iterator of row indices & row hashes pairs. - fn update_from_iter<'a>( - &mut self, - iter: impl Iterator, - deleted_offset: usize, - ) { - let (mut_map, mut_list) = self.get_mut(); - for (row, &hash_value) in iter { - let entry = mut_map.entry( - hash_value, - |&(hash, _)| hash_value == hash, - |&(hash, _)| hash, - ); +pub fn update_from_iter<'a, T>( + map: &mut HashTable<(u64, T)>, + next: &mut [T], + iter: Box + Send + 'a>, + deleted_offset: usize, +) where + T: Copy + TryFrom + PartialOrd, + >::Error: Debug, +{ + for (row, &hash_value) in iter { + let entry = map.entry( + hash_value, + |&(hash, _)| hash_value == hash, + |&(hash, _)| hash, + ); - match entry { - Occupied(mut occupied_entry) => { - // Already exists: add index to next array - let (_, index) = occupied_entry.get_mut(); - let prev_index = *index; - // Store new value inside hashmap - *index = (row + 1) as u64; - // Update chained Vec at `row` with previous value - mut_list[row - deleted_offset] = prev_index; - } - Vacant(vacant_entry) => { - vacant_entry.insert((hash_value, (row + 1) as u64)); - // chained list at `row` is already initialized with 0 - // meaning end of list - } + match entry { + Occupied(mut occupied_entry) => { + // Already exists: add index to next array + let (_, index) = occupied_entry.get_mut(); + let prev_index = *index; + // Store new value inside hashmap + *index = T::try_from(row + 1).unwrap(); + // Update chained Vec at `row` with previous value + next[row - deleted_offset] = prev_index; + } + Vacant(vacant_entry) => { + vacant_entry.insert((hash_value, T::try_from(row + 1).unwrap())); } } } +} - /// Returns all pairs of row indices matched by hash. - /// - /// This method only compares hashes, so additional further check for actual values - /// equality may be required. - fn get_matched_indices<'a>( - &self, - iter: impl Iterator, - deleted_offset: Option, - ) -> (Vec, Vec) { - let mut input_indices = vec![]; - let mut match_indices = vec![]; - - let hash_map = self.get_map(); - let next_chain = self.get_list(); - for (row_idx, hash_value) in iter { - // Get the hash and find it in the index - if let Some((_, index)) = - hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash) - { - let mut i = *index - 1; - loop { - let match_row_idx = if let Some(offset) = deleted_offset { - // This arguments means that we prune the next index way before here. - if i < offset as u64 { - // End of the list due to pruning - break; - } - i - offset as u64 - } else { - i - }; - match_indices.push(match_row_idx); - input_indices.push(row_idx as u32); - // Follow the chain to get the next index value - let next = next_chain[match_row_idx as usize]; - if next == 0 { - // end of list +pub fn get_matched_indices<'a, T>( + map: &HashTable<(u64, T)>, + next: &[T], + iter: Box + 'a>, + deleted_offset: Option, +) -> (Vec, Vec) +where + T: Copy + TryFrom + PartialOrd + Into + Sub, + >::Error: Debug, +{ + let mut input_indices = vec![]; + let mut match_indices = vec![]; + let zero = T::try_from(0).unwrap(); + let one = T::try_from(1).unwrap(); + + for (row_idx, hash_value) in iter { + // Get the hash and find it in the index + if let Some((_, index)) = map.find(*hash_value, |(hash, _)| *hash_value == *hash) + { + let mut i = *index - one; + loop { + let match_row_idx = if let Some(offset) = deleted_offset { + let offset = T::try_from(offset).unwrap(); + // This arguments means that we prune the next index way before here. + if i < offset { + // End of the list due to pruning break; } - i = next - 1; + i - offset + } else { + i + }; + match_indices.push(match_row_idx.into()); + input_indices.push(row_idx as u32); + // Follow the chain to get the next index value + let next_chain = next[match_row_idx.into() as usize]; + if next_chain == zero { + // end of list + break; } + i = next_chain - one; } } - - (input_indices, match_indices) } - /// Matches hashes with taking limit and offset into account. - /// Returns pairs of matched indices along with the starting point for next - /// matching iteration (`None` if limit has not been reached). - /// - /// This method only compares hashes, so additional further check for actual values - /// equality may be required. - fn get_matched_indices_with_limit_offset( - &self, - hash_values: &[u64], - limit: usize, - offset: JoinHashMapOffset, - ) -> (Vec, Vec, Option) { - let mut input_indices = Vec::with_capacity(limit); - let mut match_indices = Vec::with_capacity(limit); - - let hash_map: &HashTable<(u64, u64)> = self.get_map(); - let next_chain = self.get_list(); - // Check if hashmap consists of unique values - // If so, we can skip the chain traversal - if self.is_distinct() { - let start = offset.0; - let end = (start + limit).min(hash_values.len()); - for (row_idx, &hash_value) in hash_values[start..end].iter().enumerate() { - if let Some((_, index)) = - hash_map.find(hash_value, |(hash, _)| hash_value == *hash) - { - input_indices.push(start as u32 + row_idx as u32); - match_indices.push(*index - 1); - } - } - if end == hash_values.len() { - // No more values to process - return (input_indices, match_indices, None); - } - return (input_indices, match_indices, Some((end, None))); - } + (input_indices, match_indices) +} - let mut remaining_output = limit; - - // Calculate initial `hash_values` index before iterating - let to_skip = match offset { - // None `initial_next_idx` indicates that `initial_idx` processing has'n been started - (initial_idx, None) => initial_idx, - // Zero `initial_next_idx` indicates that `initial_idx` has been processed during - // previous iteration, and it should be skipped - (initial_idx, Some(0)) => initial_idx + 1, - // Otherwise, process remaining `initial_idx` matches by traversing `next_chain`, - // to start with the next index - (initial_idx, Some(initial_next_idx)) => { - chain_traverse!( - input_indices, - match_indices, - hash_values, - next_chain, - initial_idx, - initial_next_idx, - remaining_output - ); - - initial_idx + 1 - } - }; +pub fn get_matched_indices_with_limit_offset( + map: &HashTable<(u64, T)>, + next_chain: &[T], + hash_values: &[u64], + limit: usize, + offset: JoinHashMapOffset, +) -> (Vec, Vec, Option) +where + T: Copy + TryFrom + PartialOrd + Into + Sub, + >::Error: Debug, +{ + let mut input_indices = Vec::with_capacity(limit); + let mut match_indices = Vec::with_capacity(limit); + let zero = T::try_from(0).unwrap(); + let one = T::try_from(1).unwrap(); - let mut row_idx = to_skip; - - for hash_value in &hash_values[to_skip..] { - if let Some((_, index)) = - hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash) - { - chain_traverse!( - input_indices, - match_indices, - hash_values, - next_chain, - row_idx, - index, - remaining_output - ); + // Check if hashmap consists of unique values + // If so, we can skip the chain traversal + if map.len() == next_chain.len() { + let start = offset.0; + let end = (start + limit).min(hash_values.len()); + for (i, &hash) in hash_values[start..end].iter().enumerate() { + if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) { + input_indices.push(start as u32 + i as u32); + match_indices.push((*idx - one).into()); } - row_idx += 1; } - - (input_indices, match_indices, None) - } -} - -/// Implementation of `JoinHashMapType` for `JoinHashMap`. -impl JoinHashMapType for JoinHashMap { - type NextType = Vec; - - // Void implementation - fn extend_zero(&mut self, _: usize) {} - - /// Get mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType) { - (&mut self.map, &mut self.next) - } - - /// Get a reference to the hash map. - fn get_map(&self) -> &HashTable<(u64, u64)> { - &self.map + let next_off = if end == hash_values.len() { + None + } else { + Some((end, None)) + }; + return (input_indices, match_indices, next_off); } - /// Get a reference to the next. - fn get_list(&self) -> &Self::NextType { - &self.next - } + let mut remaining_output = limit; - /// Check if the values in the hashmap are distinct. - fn is_distinct(&self) -> bool { - self.map.len() == self.next.len() - } -} + // Calculate initial `hash_values` index before iterating + let to_skip = match offset { + // None `initial_next_idx` indicates that `initial_idx` processing has'n been started + (idx, None) => idx, + // Zero `initial_next_idx` indicates that `initial_idx` has been processed during + // previous iteration, and it should be skipped + (idx, Some(0)) => idx + 1, + // Otherwise, process remaining `initial_idx` matches by traversing `next_chain`, + // to start with the next index + (idx, Some(next_idx)) => { + let next_idx: T = T::try_from(next_idx as usize).unwrap(); + chain_traverse!( + input_indices, + match_indices, + hash_values, + next_chain, + idx, + next_idx, + remaining_output, + one, + zero + ); + idx + 1 + } + }; -impl Debug for JoinHashMap { - fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { - Ok(()) + let mut row_idx = to_skip; + for &hash in &hash_values[to_skip..] { + if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) { + let idx: T = *idx; + chain_traverse!( + input_indices, + match_indices, + hash_values, + next_chain, + row_idx, + idx, + remaining_output, + one, + zero + ); + } + row_idx += 1; } + (input_indices, match_indices, None) } diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 677601a12845f..8b5e8824ddbb7 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -22,6 +22,10 @@ use std::collections::{HashMap, VecDeque}; use std::mem::size_of; use std::sync::Arc; +use crate::joins::join_hash_map::{ + get_matched_indices, get_matched_indices_with_limit_offset, update_from_iter, + JoinHashMapOffset, +}; use crate::joins::utils::{JoinFilter, JoinHashMapType}; use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use crate::{metrics, ExecutionPlan}; @@ -47,26 +51,45 @@ use hashbrown::HashTable; /// Implementation of `JoinHashMapType` for `PruningJoinHashMap`. impl JoinHashMapType for PruningJoinHashMap { - type NextType = VecDeque; - // Extend with zero fn extend_zero(&mut self, len: usize) { self.next.resize(self.next.len() + len, 0) } - /// Get mutable references to the hash map and the next. - fn get_mut(&mut self) -> (&mut HashTable<(u64, u64)>, &mut Self::NextType) { - (&mut self.map, &mut self.next) + fn update_from_iter<'a>( + &mut self, + iter: Box + Send + 'a>, + deleted_offset: usize, + ) { + let slice: &mut [u64] = self.next.make_contiguous(); + update_from_iter::(&mut self.map, slice, iter, deleted_offset); } - /// Get a reference to the hash map. - fn get_map(&self) -> &HashTable<(u64, u64)> { - &self.map + fn get_matched_indices<'a>( + &self, + iter: Box + 'a>, + deleted_offset: Option, + ) -> (Vec, Vec) { + // Flatten the deque + let next: Vec = self.next.iter().copied().collect(); + get_matched_indices::(&self.map, &next, iter, deleted_offset) } - /// Get a reference to the next. - fn get_list(&self) -> &Self::NextType { - &self.next + fn get_matched_indices_with_limit_offset( + &self, + hash_values: &[u64], + limit: usize, + offset: JoinHashMapOffset, + ) -> (Vec, Vec, Option) { + // Flatten the deque + let next: Vec = self.next.iter().copied().collect(); + get_matched_indices_with_limit_offset::( + &self.map, + &next, + hash_values, + limit, + offset, + ) } } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 84575acea50fc..d4e6af5cfed97 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -1091,8 +1091,10 @@ fn lookup_join_hashmap( // (5,1) // // With this approach, the lexicographic order on both the probe side and the build side is preserved. - let (mut matched_probe, mut matched_build) = build_hashmap - .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset); + let (mut matched_probe, mut matched_build) = build_hashmap.get_matched_indices( + Box::new(hash_values.iter().enumerate().rev()), + deleted_offset, + ); matched_probe.reverse(); matched_build.reverse(); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index c5f7087ac195f..cc0d3d845d5b0 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -33,7 +33,7 @@ use crate::{ }; // compatibility pub use super::join_filter::JoinFilter; -pub use super::join_hash_map::{JoinHashMap, JoinHashMapType}; +pub use super::join_hash_map::JoinHashMapType; pub use crate::joins::{JoinOn, JoinOnRef}; use arrow::array::{ From 732fa2187396749b4abc40b034b4d60db396b403 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 17 Jun 2025 21:35:38 -0400 Subject: [PATCH 2/3] fix: Fix `NullEquality` conflicts --- datafusion/physical-plan/src/joins/hash_join.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 1864b1d8a2250..abeb4532b677d 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3512,7 +3512,7 @@ mod tests { Arc::clone(&right), on.clone(), &JoinType::RightMark, - false, + NullEquality::NullEqualsNothing, task_ctx, ) .await?; @@ -3556,7 +3556,7 @@ mod tests { Arc::clone(&right), on.clone(), &JoinType::RightMark, - false, + NullEquality::NullEqualsNothing, task_ctx, ) .await?; @@ -3696,7 +3696,7 @@ mod tests { &join_hash_map, &[left_keys_values], &[right_keys_values], - false, + NullEquality::NullEqualsNothing, &hashes_buffer, 8192, (0, None), From 5e24b78f5a4c74ba6ba8df3ff5097a4ee06e03ce Mon Sep 17 00:00:00 2001 From: Jonathan Date: Thu, 19 Jun 2025 09:42:31 -0400 Subject: [PATCH 3/3] fixes --- datafusion/physical-plan/src/joins/hash_join.rs | 6 +++--- datafusion/physical-plan/src/joins/join_hash_map.rs | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index abeb4532b677d..a825a19a011e5 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -981,7 +981,7 @@ async fn collect_left_input( .await?; // Estimation of memory size, required for hashtable, prior to allocation. - // Final result can be verifiedJoinHashMapTypele.allocation_info()` + // Final result can be verified using `RawTable.allocation_info()` let fixed_size_u32 = size_of::(); let fixed_size_u64 = size_of::(); @@ -3578,7 +3578,7 @@ mod tests { } #[test] - fn join_with_hash_collisions_u64() -> Result<()> { + fn join_with_hash_collisions_64() -> Result<()> { let mut hashmap_left = HashTable::with_capacity(4); let left = build_table_i32( ("a", &vec![10, 20]), @@ -3649,7 +3649,7 @@ mod tests { } #[test] - fn join_with_hash_collision_u32() -> Result<()> { + fn join_with_hash_collisions_u32() -> Result<()> { let mut hashmap_left = HashTable::with_capacity(4); let left = build_table_i32( ("a", &vec![10, 20]), diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index b3efaed83bdd9..b60f09dbef340 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -35,7 +35,7 @@ use hashbrown::HashTable; /// During this stage it might be the case that a row is contained the same hashmap value, /// but the values don't match. Those are checked in the `equal_rows_arr` method. /// -/// The indices (values) are stored in a separate chained list stored as `Vec` `Vec`. +/// The indices (values) are stored in a separate chained list stored as `Vec` or `Vec`. /// /// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. /// @@ -90,6 +90,9 @@ use hashbrown::HashTable; /// /// Here we have an option between creating a `JoinHashMapType` using `u32` or `u64` indices /// based on how many rows were being used for indices. +/// +/// At runtime we choose between using `JoinHashMapU32` and `JoinHashMapU64` which oth implement +/// `JoinHashMapType`. pub trait JoinHashMapType: Send + Sync { fn extend_zero(&mut self, len: usize);