diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 07d99c8e7129c..c1cfd91be0529 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -65,7 +65,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::stream::Stream; use futures::{ready, StreamExt}; -use hashbrown::raw::RawTable; +use hashbrown::hash_table::HashTable; use indexmap::IndexMap; use log::debug; @@ -442,16 +442,16 @@ pub struct LinearSearch { /// is ordered by a, b and the window expression contains a PARTITION BY b, a /// clause, this attribute stores [1, 0]. ordered_partition_by_indices: Vec, - /// We use this [`RawTable`] to calculate unique partitions for each new + /// We use this [`HashTable`] to calculate unique partitions for each new /// RecordBatch. First entry in the tuple is the hash value, the second /// entry is the unique ID for each partition (increments from 0 to n). - row_map_batch: RawTable<(u64, usize)>, - /// We use this [`RawTable`] to calculate the output columns that we can + row_map_batch: HashTable<(u64, usize)>, + /// We use this [`HashTable`] to calculate the output columns that we can /// produce at each cycle. First entry in the tuple is the hash value, the /// second entry is the unique ID for each partition (increments from 0 to n). /// The third entry stores how many new outputs are calculated for the /// corresponding partition. - row_map_out: RawTable<(u64, usize, usize)>, + row_map_out: HashTable<(u64, usize, usize)>, input_schema: SchemaRef, } @@ -610,8 +610,8 @@ impl LinearSearch { input_buffer_hashes: VecDeque::new(), random_state: Default::default(), ordered_partition_by_indices, - row_map_batch: RawTable::with_capacity(256), - row_map_out: RawTable::with_capacity(256), + row_map_batch: HashTable::with_capacity(256), + row_map_out: HashTable::with_capacity(256), input_schema, } } @@ -631,7 +631,7 @@ impl LinearSearch { // res stores PartitionKey and row indices (indices where these partition occurs in the `batch`) for each partition. let mut result: Vec<(PartitionKey, Vec)> = vec![]; for (hash, row_idx) in batch_hashes.into_iter().zip(0u32..) { - let entry = self.row_map_batch.get_mut(hash, |(_, group_idx)| { + let entry = self.row_map_batch.find_mut(hash, |(_, group_idx)| { // We can safely get the first index of the partition indices // since partition indices has one element during initialization. let row = get_row_at_idx(columns, row_idx as usize).unwrap(); @@ -641,8 +641,11 @@ impl LinearSearch { if let Some((_, group_idx)) = entry { result[*group_idx].1.push(row_idx) } else { - self.row_map_batch - .insert(hash, (hash, result.len()), |(hash, _)| *hash); + self.row_map_batch.insert_unique( + hash, + (hash, result.len()), + |(hash, _)| *hash, + ); let row = get_row_at_idx(columns, row_idx as usize)?; // This is a new partition its only index is row_idx for now. result.push((row, vec![row_idx])); @@ -667,7 +670,7 @@ impl LinearSearch { self.row_map_out.clear(); let mut partition_indices: Vec<(PartitionKey, Vec)> = vec![]; for (hash, row_idx) in self.input_buffer_hashes.iter().zip(0u32..) { - let entry = self.row_map_out.get_mut(*hash, |(_, group_idx, _)| { + let entry = self.row_map_out.find_mut(*hash, |(_, group_idx, _)| { let row = get_row_at_idx(&partition_by_columns, row_idx as usize).unwrap(); row == partition_indices[*group_idx].0 @@ -693,7 +696,7 @@ impl LinearSearch { if min_out == 0 { break; } - self.row_map_out.insert( + self.row_map_out.insert_unique( *hash, (*hash, partition_indices.len(), min_out), |(hash, _, _)| *hash,