Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7b3a0a2
Optimize topk with filter
Dandandan Apr 13, 2025
75acd07
add sort_tpch_limit bench
Dandandan Apr 13, 2025
32f11dc
early return
Dandandan Apr 13, 2025
52583a1
Merge
Dandandan Apr 13, 2025
351663b
Clippy
Dandandan Apr 13, 2025
eeb8ce4
Respect lexicographical ordering, only apply first filter
Dandandan Apr 13, 2025
f0290c4
Respect lexicographical ordering
Dandandan Apr 13, 2025
67aa03a
Respect lexicographical ordering, only apply first filter
Dandandan Apr 13, 2025
559b789
Respect lexicographical ordering, only apply first filter
Dandandan Apr 13, 2025
4a24e75
Simplify and add link
Dandandan Apr 13, 2025
5d42ee7
Still run early completion
Dandandan Apr 13, 2025
7003aed
Keep null values
Dandandan Apr 14, 2025
1610f78
Keep null values
Dandandan Apr 14, 2025
b046a73
Update datafusion/physical-plan/src/topk/mod.rs
Dandandan Apr 14, 2025
fe6fc48
Clippy
Dandandan Apr 14, 2025
f457ce8
Refactor
Dandandan Apr 14, 2025
40dc1d9
Ignore null threshold
Dandandan Apr 14, 2025
8d1bfe3
Update datafusion/physical-plan/src/topk/mod.rs
Dandandan Apr 15, 2025
f735f64
Fix
Dandandan Apr 15, 2025
923089f
Minor improvements
Dandandan Apr 15, 2025
6fb5b59
Merge remote-tracking branch 'upstream/main' into improve_topk
Dandandan Apr 15, 2025
9a99cce
Fix
Dandandan Apr 18, 2025
ea480b7
Fix
Dandandan Apr 18, 2025
3865960
Fix
Dandandan Apr 18, 2025
375679e
Merge remote-tracking branch 'upstream/main' into improve_topk
Dandandan Apr 18, 2025
2425b93
Add scalarvalue api
Dandandan Apr 18, 2025
63eca0b
Only update if heap updated
Dandandan Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor
  • Loading branch information
Dandandan committed Apr 14, 2025
commit f457ce8e76efe84224ddc4704285e36436b24be0
55 changes: 31 additions & 24 deletions datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,32 +289,18 @@ impl TopK {

match selected_rows {
Some(filter) => {
for (index, row) in filter.values().set_indices().zip(rows.iter()) {
match self.heap.max() {
// heap has k items, and the new row is greater than the
// current max in the heap ==> it is not a new topk
Some(max_row) if row.as_ref() >= max_row.row() => {}
// don't yet have k items or new item is lower than the currently k low values
None | Some(_) => {
self.heap.add(&mut batch_entry, row, index);
replacements += 1;
}
}
}
self.find_new_topk_items(
filter.values().set_indices(),
&mut batch_entry,
&mut replacements,
);
}
None => {
for (index, row) in rows.iter().enumerate() {
match self.heap.max() {
// heap has k items, and the new row is greater than the
// current max in the heap ==> it is not a new topk
Some(max_row) if row.as_ref() >= max_row.row() => {}
// don't yet have k items or new item is lower than the currently k low values
None | Some(_) => {
self.heap.add(&mut batch_entry, row, index);
replacements += 1;
}
}
}
self.find_new_topk_items(
0..sort_keys[0].len(),
&mut batch_entry,
&mut replacements,
);
}
}

Expand All @@ -334,6 +320,27 @@ impl TopK {
Ok(())
}

fn find_new_topk_items(
&mut self,
items: impl Iterator<Item = usize>,
batch_entry: &mut RecordBatchEntry,
replacements: &mut usize,
) {
let rows = &mut self.scratch_rows;
for (index, row) in items.zip(rows.iter()) {
match self.heap.max() {
// heap has k items, and the new row is greater than the
// current max in the heap ==> it is not a new topk
Some(max_row) if row.as_ref() >= max_row.row() => {}
// don't yet have k items or new item is lower than the currently k low values
None | Some(_) => {
self.heap.add(batch_entry, row, index);
*replacements += 1;
}
}
}
}

/// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full,
/// check if the computation can be finished early.
/// This is the case if the last row of the current batch is strictly greater than the max row in the heap,
Expand Down
Loading