Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,8 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// we skip the following block. Until then, this function may be called multiple
// times and can return Poll::Pending if any partition returns Poll::Pending.
if self.loser_tree.is_empty() {
let remaining_partitions = self.uninitiated_partitions.clone();
for i in remaining_partitions {
match self.maybe_poll_stream(cx, i) {
while let Some(&partition_idx) = self.uninitiated_partitions.front() {
match self.maybe_poll_stream(cx, partition_idx) {
Poll::Ready(Err(e)) => {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
Expand All @@ -228,10 +227,8 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// If a partition returns Poll::Pending, to avoid continuously polling it
// and potentially increasing upstream buffer sizes, we move it to the
// back of the polling queue.
if let Some(front) = self.uninitiated_partitions.pop_front() {
// This pop_front can never return `None`.
self.uninitiated_partitions.push_back(front);
}
self.uninitiated_partitions.rotate_left(1);

// This function could remain in a pending state, so we manually wake it here.
// However, this approach can be investigated further to find a more natural way
// to avoid disrupting the runtime scheduler.
Expand All @@ -241,10 +238,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
_ => {
// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
// we remove this partition from the queue so it is not polled again.
self.uninitiated_partitions.retain(|idx| *idx != i);
self.uninitiated_partitions.pop_front();
}
}
}

// Claim the memory for the uninitiated partitions
self.uninitiated_partitions.shrink_to_fit();
self.init_loser_tree();
}

Expand Down