From 3b4073b88ad7c1cf772a0ec8523f147716882ea2 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 17 Apr 2024 16:15:05 -0400 Subject: [PATCH 1/5] Split default merge batcher This change splits the default merge batcher implementation into a type that maintains the outer part of its algorithm, specifically knows how to maintain chains, and an inner part that knows how to maintain the individual batches in chains. The benefit is that the outer part does not need to know about the contents of the containers it holds on to because that's encapsulated in the inner trait's implementation. Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 4 +- src/operators/arrange/upsert.rs | 10 +- src/operators/reduce.rs | 23 +- src/trace/implementations/merge_batcher.rs | 452 +++++++++--------- .../implementations/merge_batcher_col.rs | 243 +++++++++- src/trace/implementations/mod.rs | 3 - src/trace/implementations/ord_neu.rs | 166 ++++--- src/trace/implementations/rhh.rs | 106 ++-- src/trace/mod.rs | 37 +- 9 files changed, 664 insertions(+), 380 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index c22749a27..3b14646e3 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -284,7 +284,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -303,7 +303,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 758ec8df3..43c0e08f2 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder>, { let mut reader: Option> = None; @@ -241,7 +241,6 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); - let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { use trace::cursor::MyTrait; @@ -282,11 +281,10 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); - for update in updates.drain(..) { - builder.push(update); - } } - let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); + let mut batches = vec![std::mem::take(&mut updates)]; + let batch = Tr::Builder::from_batches(&mut batches, prev_frontier.borrow(), upper.borrow(), Antichain::from_elem(G::Timestamp::minimum()).borrow()); + updates = batches.into_iter().next().unwrap_or_default(); prev_frontier.clone_from(&upper); // Communicate `batch` to the arrangement and the stream. diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index db80a5d44..94254c569 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -240,7 +240,7 @@ pub trait ReduceCore where /// .map(|x| (x, x)) /// .reduce_abelian::<_,_,ValSpine<_,_,_,_>>( /// "Example", - /// Clone::clone, + /// Clone::clone, /// move |_key, src, dst| dst.push((*src[0].0, 1)) /// ) /// .trace; @@ -252,7 +252,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +274,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +293,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +312,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -529,9 +529,20 @@ where // (ii) that the buffers are time-ordered, and (iii) that the builders accept // arbitrarily ordered times. for index in 0 .. buffers.len() { + // TODO: This doesn't reuse allocations for `update`. + let mut update = Vec::with_capacity(1024); buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.into_owned(), val), time, diff)); + update.push(((key.into_owned(), val), time, diff)); + if update.len() == update.capacity() { + let mut chain = vec![update]; + builders[index].push_batches(&mut chain); + update = chain.pop().unwrap_or_else(|| Vec::with_capacity(1024)); + update.clear(); + } + } + if !update.is_empty() { + builders[index].push_batches(&mut vec![update]); } } } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 2cf27ed1b..e0e55a2f4 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,169 +1,220 @@ //! A general purpose `Batcher` implementation based on radix sort. use std::collections::VecDeque; +use std::marker::PhantomData; use timely::communication::message::RefOrMut; +use timely::{Container, PartialOrder}; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; +use timely::progress::frontier::AntichainRef; +use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; /// Creates batches from unordered tuples. -pub struct MergeBatcher { - sorter: MergeSorter<(K, V), T, D>, +pub struct MergeBatcher +where + M: Merger, +{ + /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. + queue: Vec>, + /// Stash of empty batches + stash: Vec, + /// Thing to accept data, merge chains, and talk to the builder. + merger: M, + /// Logger for size accounting. + logger: Option>, + /// Timely operator ID. + operator_id: usize, + /// Current lower frontier, we sealed up to here. lower: Antichain, + /// The lower-bound frontier of the data, after the last call to seal. frontier: Antichain, } -impl Batcher for MergeBatcher +impl Batcher for MergeBatcher where - K: Ord + Clone, - V: Ord + Clone, + M: Merger, T: Timestamp, - D: Semigroup, { - type Input = Vec<((K,V),T,D)>; - type Output = ((K,V),T,D); + type Input = M::Input; + type Output = M::Batch; type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { - MergeBatcher { - sorter: MergeSorter::new(logger, operator_id), + Self { + logger, + operator_id, + merger: M::default(), + queue: Vec::new(), + stash: Vec::new(), frontier: Antichain::new(), lower: Antichain::from_elem(T::minimum()), } } - #[inline(never)] - fn push_batch(&mut self, batch: RefOrMut) { - // `batch` is either a shared reference or an owned allocations. - match batch { - RefOrMut::Ref(reference) => { - // This is a moment at which we could capture the allocations backing - // `batch` into a different form of region, rather than just cloning. - let mut owned: Vec<_> = self.sorter.empty(); - owned.clone_from(reference); - self.sorter.push(&mut owned); - }, - RefOrMut::Mut(reference) => { - self.sorter.push(reference); - } - } + fn push_batch(&mut self, batch: RefOrMut) { + let batch = self.merger.accept(batch, &mut self.stash); + self.insert_chain(batch); } // Sealing a batch means finding those updates with times not greater or equal to any time // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - #[inline(never)] fn seal>(&mut self, upper: Antichain) -> B::Output { + // Finish + let batch = self.merger.finish(&mut self.stash); + if !batch.is_empty() { + self.queue_push(batch); + } - let mut merged = Vec::new(); - self.sorter.finish_into(&mut merged); - - // Determine the number of distinct keys, values, and updates, - // and form a builder pre-sized for these numbers. - let mut builder = { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in merged.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); - } - } - } - B::with_capacity(keys, vals, upds) - }; + // Merge all remaining chains into a single chain. + while self.queue.len() > 1 { + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue_push(merged); + } + let merged = self.queue_pop().unwrap_or_default(); + // Extract readied data. let mut kept = Vec::new(); - let mut keep = Vec::new(); - + let mut readied = Vec::new(); self.frontier.clear(); - // TODO: Re-use buffer, rather than dropping. - for mut buffer in merged.drain(..) { - for ((key, val), time, diff) in buffer.drain(..) { - if upper.less_equal(&time) { - self.frontier.insert(time.clone()); - if keep.len() == keep.capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.sorter.empty(); - } - keep.push(((key, val), time, diff)); - } - else { - builder.push(((key, val), time, diff)); - } - } - // Recycling buffer. - self.sorter.push(&mut buffer); - } + self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash); - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } if !kept.is_empty() { - self.sorter.push_list(kept); + self.queue_push(kept); } - // Drain buffers (fast reclaimation). - // TODO : This isn't obviously the best policy, but "safe" wrt footprint. - // In particular, if we are reading serialized input data, we may - // prefer to keep these buffers around to re-fill, if possible. - let mut buffer = Vec::new(); - self.sorter.push(&mut buffer); - // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 { - buffer = Vec::new(); - self.sorter.push(&mut buffer); - } + self.stash.clear(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); + let seal = B::from_batches(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow()); self.lower = upper; seal } /// The frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + #[inline] + fn frontier(&mut self) -> AntichainRef { self.frontier.borrow() } } +impl MergeBatcher +where + M: Merger +{ + fn insert_chain(&mut self, chain: Vec<::Batch>) { + if !chain.is_empty() { + self.queue_push(chain); + while self.queue.len() > 1 && (self.queue[self.queue.len() - 1].len() >= self.queue[self.queue.len() - 2].len() / 2) { + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue_push(merged); + } + } + } -struct MergeSorter { - /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. - queue: Vec>>, - stash: Vec>, - logger: Option>, - operator_id: usize, + // merges two sorted input lists into one sorted output list. + fn merge_by(&mut self, list1: Vec, list2: Vec) -> Vec { + self.account(list1.iter().chain(list2.iter()).map(M::account), -1); + + // TODO: `list1` and `list2` get dropped; would be better to reuse? + let mut output = Vec::with_capacity(list1.len() + list2.len()); + self.merger.merge(list1, list2, &mut output, &mut self.stash); + + output + } + + /// Pop a batch from `self.queue` and account size changes. + #[inline] + fn queue_pop(&mut self) -> Option> { + let batch = self.queue.pop(); + self.account(batch.iter().flatten().map(M::account), -1); + batch + } + + /// Push a batch to `self.queue` and account size changes. + #[inline] + fn queue_push(&mut self, batch: Vec) { + self.account(batch.iter().map(M::account), 1); + self.queue.push(batch); + } + + /// Account size changes. Only performs work if a logger exists. + /// + /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute + /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. + #[inline] + fn account>(&self, items: I, diff: isize) { + if let Some(logger) = &self.logger { + let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize); + for (records_, size_, capacity_, allocations_) in items { + records = records.saturating_add_unsigned(records_); + size = size.saturating_add_unsigned(size_); + capacity = capacity.saturating_add_unsigned(capacity_); + allocations = allocations.saturating_add_unsigned(allocations_); + } + logger.log(BatcherEvent { + operator: self.operator_id, + records_diff: records * diff, + size_diff: size * diff, + capacity_diff: capacity * diff, + allocations_diff: allocations * diff, + }) + } + } } -impl MergeSorter { +impl Drop for MergeBatcher { + fn drop(&mut self) { + while self.queue_pop().is_some() { } + } +} - const BUFFER_SIZE_BYTES: usize = 1 << 13; +/// A trait to describe interesting moments in a merge batcher. +pub trait Merger: Default { + /// The type of update containers received from inputs. + type Input; + /// The internal representation of batches of data. + type Batch: Container; + /// The type of time in frontiers to extract updates. + type Time; + /// Accept a fresh batch of input data. + fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec; + /// Finish processing any stashed data. + fn finish(&mut self, stash: &mut Vec) -> Vec; + /// Merge chains into an output chain. + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec); + /// Extract ready updates based on the `upper` frontier. + fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, keep: &mut Vec, stash: &mut Vec); + + /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). + fn account(batch: &Self::Batch) -> (usize, usize, usize, usize); +} + +/// A merger that knows how to accept and maintain chains of vectors. +pub struct VecMerger { + _marker: PhantomData, +} + +impl Default for VecMerger { + fn default() -> Self { + Self {_marker: PhantomData} + } +} - fn buffer_size() -> usize { - let size = ::std::mem::size_of::<(D, T, R)>(); +impl VecMerger { + const BUFFER_SIZE_BYTES: usize = 1 << 13; + fn preferred_buffer_size(&self) -> usize { + let size = ::std::mem::size_of::(); if size == 0 { Self::BUFFER_SIZE_BYTES } else if size <= Self::BUFFER_SIZE_BYTES { @@ -173,88 +224,65 @@ impl MergeSorter { } } + /// Helper to get pre-sized vector from the stash. #[inline] - fn new(logger: Option>, operator_id: usize) -> Self { - Self { - logger, - operator_id, - queue: Vec::new(), - stash: Vec::new(), - } + fn empty(&self, stash: &mut Vec>) -> Vec { + stash.pop().unwrap_or_else(|| Vec::with_capacity(self.preferred_buffer_size())) } + /// Helper to return a batch to the stash. #[inline] - pub fn empty(&mut self) -> Vec<(D, T, R)> { - self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size())) + fn recycle(&self, mut batch: Vec, stash: &mut Vec>) { + // TODO: Should we limit the size of `stash`? + if batch.capacity() == self.preferred_buffer_size() /*&& stash.len() < 2*/ { + batch.clear(); + stash.push(batch); + } } +} - #[inline] - pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { - // TODO: Reason about possible unbounded stash growth. How to / should we return them? - // TODO: Reason about mis-sized vectors, from deserialized data; should probably drop. - let mut batch = if self.stash.len() > 2 { - ::std::mem::replace(batch, self.stash.pop().unwrap()) - } - else { - ::std::mem::take(batch) - }; +impl Merger for VecMerger<(D,T,R)> { + type Time = T; + type Input = Vec<(D,T,R)>; + type Batch = Vec<(D,T,R)>; - if !batch.is_empty() { - crate::consolidation::consolidate_updates(&mut batch); - self.account([batch.len()], 1); - self.queue_push(vec![batch]); - while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + fn accept(&mut self, batch: RefOrMut>, stash: &mut Vec) -> Vec> { + // `batch` is either a shared reference or an owned allocations. + let mut owned = match batch { + RefOrMut::Ref(vec) => { + let mut owned = self.empty(stash); + owned.clone_from(vec); + owned } + RefOrMut::Mut(vec) => std::mem::take(vec) + }; + consolidate_updates(&mut owned); + if owned.capacity() == self.preferred_buffer_size() { + vec![owned] + } else { + let mut chain = Vec::with_capacity((owned.len() + self.preferred_buffer_size() - 1) / self.preferred_buffer_size()); + let mut iter = owned.drain(..).peekable(); + while iter.peek().is_some() { + let mut batch = self.empty(stash); + batch.extend((&mut iter).take(self.preferred_buffer_size())); + chain.push(batch); + } + chain } } - // This is awkward, because it isn't a power-of-two length any more, and we don't want - // to break it down to be so. - pub fn push_list(&mut self, list: Vec>) { - while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - self.queue_push(list); - } - - #[inline(never)] - pub fn finish_into(&mut self, target: &mut Vec>) { - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - - if let Some(mut last) = self.queue_pop() { - ::std::mem::swap(&mut last, target); - } + fn finish(&mut self, _stash: &mut Vec>) -> Vec> { + vec![] } - // merges two sorted input lists into one sorted output list. - #[inline(never)] - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { - self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1); - - use std::cmp::Ordering; - - // TODO: `list1` and `list2` get dropped; would be better to reuse? - let mut output = Vec::with_capacity(list1.len() + list2.len()); - let mut result = self.empty(); - + fn merge(&mut self, list1: Vec>, list2: Vec>, output: &mut Vec>, stash: &mut Vec>) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); + let mut result = self.empty(stash); + // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { @@ -265,6 +293,7 @@ impl MergeSorter { let y = head2.front().unwrap(); (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; + use std::cmp::Ordering; match cmp { Ordering::Less => result.push(head1.pop_front().unwrap()), Ordering::Greater => result.push(head2.pop_front().unwrap()), @@ -281,81 +310,74 @@ impl MergeSorter { if result.capacity() == result.len() { output.push(result); - result = self.empty(); + result = self.empty(stash); } if head1.is_empty() { let done1 = Vec::from(head1); - if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } + self.recycle(done1, stash); head1 = VecDeque::from(list1.next().unwrap_or_default()); } if head2.is_empty() { let done2 = Vec::from(head2); - if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } + self.recycle(done2, stash); head2 = VecDeque::from(list2.next().unwrap_or_default()); } } if !result.is_empty() { output.push(result); } - else if result.capacity() > 0 { self.stash.push(result); } + else { self.recycle(result, stash); } if !head1.is_empty() { - let mut result = self.empty(); + let mut result = self.empty(stash); for item1 in head1 { result.push(item1); } output.push(result); } output.extend(list1); if !head2.is_empty() { - let mut result = self.empty(); + let mut result = self.empty(stash); for item2 in head2 { result.push(item2); } output.push(result); } output.extend(list2); - - output } -} -impl MergeSorter { - /// Pop a batch from `self.queue` and account size changes. - #[inline] - fn queue_pop(&mut self) -> Option>> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten().map(Vec::len), -1); - batch - } + fn extract(&mut self, merged: Vec>, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec>, kept: &mut Vec>, stash: &mut Vec>) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); - /// Push a batch to `self.queue` and account size changes. - #[inline] - fn queue_push(&mut self, batch: Vec>) { - self.account(batch.iter().map(Vec::len), 1); - self.queue.push(batch); - } - - /// Account size changes. Only performs work if a logger exists. - /// - /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute - /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. - fn account>(&self, items: I, diff: isize) { - if let Some(logger) = &self.logger { - let mut records= 0isize; - for len in items { - records = records.saturating_add_unsigned(len); + for mut buffer in merged { + for (data, time, diff) in buffer.drain(..) { + if upper.less_equal(&time) { + frontier.insert(time.clone()); + if keep.len() == keep.capacity() && !keep.is_empty() { + kept.push(keep); + keep = self.empty(stash); + } + keep.push((data, time, diff)); + } + else { + if ready.len() == ready.capacity() && !ready.is_empty() { + readied.push(ready); + ready = self.empty(stash); + } + ready.push((data, time, diff)); + } } - logger.log(BatcherEvent { - operator: self.operator_id, - records_diff: records * diff, - size_diff: 0, - capacity_diff: 0, - allocations_diff: 0, - }) + // Recycling buffer. + self.recycle(buffer, stash); + } + // Finish the kept data. + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + readied.push(ready); } } -} -impl Drop for MergeSorter { - fn drop(&mut self) { - while self.queue_pop().is_some() { } + fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { + (batch.len(), 0, 0, 0) } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 21416170d..bc86e0b60 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,15 +1,234 @@ //! A general purpose `Batcher` implementation based on radix sort for TimelyStack. -use timely::Container; +use std::cmp::Ordering; +use timely::{Container, Data, PartialOrder}; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; +use timely::progress::frontier::AntichainRef; +use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; +use crate::trace::implementations::merge_batcher::Merger; + +/// TODO +pub struct ColumnationMerger { + pending: Vec, +} + +impl Default for ColumnationMerger { + fn default() -> Self { + Self { pending: Vec::default() } + } +} + +impl ColumnationMerger { + const BUFFER_SIZE_BYTES: usize = 1 << 13; + fn preferred_buffer_size(&self) -> usize { + let size = ::std::mem::size_of::(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + /// Buffer size for pending updates, currently 4 * [`Self::buffer_size`]. + fn pending_buffer_size(&self) -> usize { + self.preferred_buffer_size() * 1 + } + + /// Helper to get pre-sized vector from the stash. + #[inline] + fn empty(&self, stash: &mut Vec>) -> TimelyStack { + stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(self.preferred_buffer_size())) + } + + /// Helper to return a batch to the stash. + #[inline] + fn recycle(&self, mut batch: TimelyStack, stash: &mut Vec>) { + // TODO: Should we limit the size of `stash`? + if batch.capacity() == self.preferred_buffer_size() /*&& stash.len() < 2*/ { + batch.clear(); + stash.push(batch); + } + } +} + +impl Merger for ColumnationMerger<(D, T, R)> +where + D: Columnation + Ord + Data, + T: Columnation + Ord + PartialOrder + Data, + R: Columnation + Semigroup + 'static, +{ + type Time = T; + type Input = Vec<(D,T,R)>; + type Batch = TimelyStack<(D,T,R)>; + + fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { + // `batch` is either a shared reference or an owned allocations. + let mut batch: Vec<_> = match batch { + RefOrMut::Ref(vec) => { + let mut owned = Vec::default(); + owned.clone_from(vec); + owned + } + RefOrMut::Mut(vec) => std::mem::take(vec) + }; + // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. + if self.pending.capacity() < self.pending_buffer_size() { + self.pending.reserve(self.pending_buffer_size() - self.pending.capacity()); + } + + let mut output = Vec::default(); + + while !batch.is_empty() { + self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); + if self.pending.len() == self.pending.capacity() { + consolidate_updates(&mut self.pending); + if self.pending.len() > self.pending.capacity() / 2 { + // Flush if `self.pending` is more than half full after consolidation. + let mut stack = self.empty(stash); + stack.reserve_items(self.pending.iter()); + for tuple in self.pending.drain(..) { + stack.copy(&tuple); + } + output.push(stack); + } + } + } + + output + } + + fn finish(&mut self, _stash: &mut Vec) -> Vec { + vec![] + } + + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); + + let mut head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); + let mut head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); + + let mut result = self.empty(stash); + + // while we have valid data in each input, merge. + while !head1.is_empty() && !head2.is_empty() { + + while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { + + let cmp = { + let x = head1.peek(); + let y = head2.peek(); + (&x.0, &x.1).cmp(&(&y.0, &y.1)) + }; + match cmp { + Ordering::Less => { result.copy(head1.pop()); } + Ordering::Greater => { result.copy(head2.pop()); } + Ordering::Equal => { + let (data1, time1, diff1) = head1.pop(); + let (_data2, _time2, diff2) = head2.pop(); + let mut diff1 = diff1.clone(); + diff1.plus_equals(diff2); + if !diff1.is_zero() { + result.copy_destructured(data1, time1, &diff1); + } + } + } + } + + if result.capacity() == result.len() { + output.push(result); + result = self.empty(stash); + } + + if head1.is_empty() { + self.recycle(head1.done(), stash); + head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); + } + if head2.is_empty() { + self.recycle(head2.done(), stash); + head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); + } + } + + if result.len() > 0 { + output.push(result); + } else { + self.recycle(result, stash); + } + + if !head1.is_empty() { + let mut result = self.empty(stash); + result.reserve_items(head1.iter()); + for item in head1.iter() { result.copy(item); } + output.push(result); + } + output.extend(list1); + + if !head2.is_empty() { + let mut result = self.empty(stash); + result.reserve_items(head2.iter()); + for item in head2.iter() { result.copy(item); } + output.push(result); + } + output.extend(list2); + } + + fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for buffer in merged { + for d @ (_data, time, _diff) in buffer.iter() { + if upper.less_equal(time) { + frontier.insert(time.clone()); + if keep.len() == keep.capacity() && !keep.is_empty() { + kept.push(keep); + keep = self.empty(stash); + } + keep.copy(d); + } + else { + if ready.len() == ready.capacity() && !ready.is_empty() { + readied.push(ready); + ready = self.empty(stash); + } + ready.copy(d); + } + } + // Recycling buffer. + self.recycle(buffer, stash); + } + // Finish the kept data. + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + readied.push(ready); + } + } + + fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let cb = |siz, cap| { + size += siz; + capacity += cap; + allocations += 1; + }; + batch.heap_size(cb); + (batch.len(), size, capacity, allocations) + } +} + /// Creates batches from unordered tuples. pub struct ColumnatedMergeBatcher @@ -32,7 +251,7 @@ where D: Columnation + Semigroup + 'static, { type Input = Vec<((K,V),T,D)>; - type Output = ((K,V),T,D); + type Output = Vec<((K,V),T,D)>; type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { @@ -101,6 +320,8 @@ where let mut kept = Vec::new(); let mut keep = TimelyStack::default(); + let mut readied = Vec::new(); + let mut ready = Vec::default(); self.frontier.clear(); @@ -119,7 +340,15 @@ where keep.copy(datum); } else { - builder.copy(datum); + if ready.is_empty() { + if ready.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { + ready = Vec::with_capacity(MergeSorterColumnation::<(K, V), T, D>::buffer_size()); + } + } else if ready.len() == ready.capacity() { + readied.push(ready); + ready = Vec::with_capacity(MergeSorterColumnation::<(K, V), T, D>::buffer_size()); + } + ready.push(datum.clone()); } } // Recycling buffer. @@ -134,6 +363,13 @@ where self.sorter.push_list(kept); } + if !ready.is_empty() { + readied.push(ready); + } + if !readied.is_empty() { + builder.push_batches(&mut readied); + } + // Drain buffers (fast reclamation). self.sorter.clear_stash(); @@ -312,7 +548,6 @@ impl>, list2: Vec>) -> Vec> { - use std::cmp::Ordering; // TODO: `list1` and `list2` get dropped; would be better to reuse? let mut output = Vec::with_capacity(list1.len() + list2.len()); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index d700f2b6b..cd0b8fd9a 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -42,9 +42,6 @@ pub mod spine_fueled; pub mod merge_batcher; pub mod merge_batcher_col; - -pub use self::merge_batcher::MergeBatcher as Batcher; - pub mod ord_neu; pub mod rhh; pub mod huffman_container; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 42e45aefe..0ece74bc1 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -11,7 +11,7 @@ use std::rc::Rc; use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::implementations::merge_batcher::MergeBatcher; +use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; use crate::trace::rc_blanket_impls::RcBuilder; @@ -23,7 +23,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine< Rc>>, - MergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; // /// A trace implementation for empty values using a spine of ordered lists. @@ -32,6 +32,7 @@ pub type OrdValSpine = Spine< /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine< Rc>>, + // MergeBatcher, T>, ColumnatedMergeBatcher, RcBuilder>>, >; @@ -39,7 +40,7 @@ pub type ColValSpine = Spine< /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, - MergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; // /// A trace implementation for empty values using a spine of ordered lists. @@ -48,6 +49,7 @@ pub type OrdKeySpine = Spine< /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine< Rc>>, + // MergeBatcher, T>, ColumnatedMergeBatcher, RcBuilder>>, >; @@ -55,6 +57,7 @@ pub type ColKeySpine = Spine< /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, + // MergeBatcher::Owned,::Owned),T,R)>,T>, ColumnatedMergeBatcher<::Owned,::Owned,T,R>, RcBuilder>>, >; @@ -538,7 +541,7 @@ mod val_batch { impl Builder for OrdValBuilder { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = Vec<((::Key, ::Val), ::Time, ::Diff)>; type Time = ::Time; type Output = OrdValBatch; @@ -557,59 +560,60 @@ mod val_batch { } } - #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.vals.push(val); + fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for buffer in batches.iter() { + for ((key, val), time, _) in buffer.iter() { + if !upper.less_equal(time) { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } + else if p_val != val { + vals += 1; + } + upds += 1; + } else { + keys += 1; + vals += 1; + upds += 1; + } + prev_keyval = Some((key, val)); + } } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); } + let mut new = Self::with_capacity(keys, vals, upds); + new.push_batches(batches); + new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); + fn push_batches(&mut self, batches: &mut Vec) { + for ((key, val), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.vals.push(val); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time, diff); + self.result.vals.push(val); + self.result.keys.push(key); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - self.result.keys.copy_push(key); } } @@ -1002,7 +1006,7 @@ mod key_batch { impl Builder for OrdKeyBuilder { - type Input = ((::Key, ()), ::Time, ::Diff); + type Input = Vec<((::Key, ()), ::Time, ::Diff)>; type Time = ::Time; type Output = OrdKeyBatch; @@ -1019,35 +1023,45 @@ mod key_batch { } } - #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); + /// Build from batches + fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { + let mut keys = 0; + let mut upds = 0; + let mut prev_key = None; + for buffer in batches.iter() { + for ((key, ()), time, _) in buffer.iter() { + if !upper.less_equal(time) { + if let Some(p_key) = prev_key { + if p_key != key { + keys += 1; + } + upds += 1; + } else { + keys += 1; + upds += 1; + } + prev_key = Some(key); + } + } } + let mut new = Self::with_capacity(keys, 0, upds); + new.push_batches(batches); + new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - self.push_update(time.clone(), diff.clone()); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.keys.copy_push(key); + fn push_batches(&mut self, batches: &mut Vec) { + for ((key, ()), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.keys.push(key); + } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index b01d1dae3..bceebde29 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -9,7 +9,7 @@ use std::rc::Rc; use crate::Hashable; use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::implementations::merge_batcher::MergeBatcher; +use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; use crate::trace::rc_blanket_impls::RcBuilder; @@ -20,7 +20,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine< Rc>>, - MergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; // /// A trace implementation for empty values using a spine of ordered lists. @@ -729,10 +729,41 @@ mod val_batch { ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = Vec<((::Key, ::Val), ::Time, ::Diff)>; type Time = ::Time; type Output = RhhValBatch; + fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for buffer in batches.iter() { + for ((key, val), time, _) in buffer.iter() { + if !upper.less_equal(time) { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } + else if p_val != val { + vals += 1; + } + upds += 1; + } else { + keys += 1; + vals += 1; + upds += 1; + } + prev_keyval = Some((key, val)); + } + } + } + let mut new = Self::with_capacity(keys, vals, upds); + new.push_batches(batches); + new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + } + fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { // Double the capacity for RHH; probably excessive. @@ -760,61 +791,30 @@ mod val_batch { } } - #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push_batches(&mut self, batches: &mut Vec) { + for ((key, val), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.vals.push(val); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); + // Insert the key, but with no specified offset. + self.result.insert_key(key.borrow(), None); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key, None); } } @@ -839,4 +839,4 @@ mod key_batch { // Copy the above, once it works! -} \ No newline at end of file +} diff --git a/src/trace/mod.rs b/src/trace/mod.rs index d1f878342..095e3fc84 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -320,13 +320,16 @@ pub trait Batcher { /// Functionality for building batches from ordered update sequences. pub trait Builder: Sized { - /// Input item type. + /// Input type. type Input; /// Timestamp type. type Time: Timestamp; /// Output batch type. type Output; - + + /// Build from batches + fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output; + /// Allocates an empty builder. /// /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`. @@ -335,17 +338,14 @@ pub trait Builder: Sized { /// Allocates an empty builder with capacity for the specified keys, values, and updates. /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. + // #[deprecated] fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds an element to the batch. - /// - /// The default implementation uses `self.copy` with references to the owned arguments. - /// One should override it if the builder can take advantage of owned arguments. - fn push(&mut self, element: Self::Input) { - self.copy(&element); - } - /// Adds an element to the batch. - fn copy(&mut self, element: &Self::Input); + /// Adds elements in sorted order to the batch. + /// TODO: Refine the `batches` parameter to allow allocation reuse. + // #[deprecated] + fn push_batches(&mut self, batches: &mut Vec); /// Completes building and returns the batch. + // #[deprecated] fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -453,9 +453,11 @@ pub mod rc_blanket_impls { type Input = B::Input; type Time = B::Time; type Output = Rc; + fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { + Rc::new(B::from_batches(batches, lower, upper, since)) + } fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push_batches(&mut self, batches: &mut Vec) { self.builder.push_batches(batches) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -560,9 +562,14 @@ pub mod abomonated_blanket_impls { type Input = B::Input; type Time = B::Time; type Output = Abomonated>; + fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { + let batch = B::from_batches(batches, lower, upper, since); + let mut bytes = Vec::with_capacity(measure(&batch)); + unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; + unsafe { Abomonated::::new(bytes).unwrap() } + } fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push_batches(&mut self, batches: &mut Vec) { self.builder.push_batches(batches) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch)); From b07aa9eb050b0b70b4ac95791252c471fc73e053 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 24 Apr 2024 15:54:07 -0400 Subject: [PATCH 2/5] Undo some changes, rip out old columnated batcher Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 4 +- src/operators/arrange/upsert.rs | 10 +- src/operators/reduce.rs | 21 +- src/trace/implementations/merge_batcher.rs | 62 ++- .../implementations/merge_batcher_col.rs | 416 +----------------- src/trace/implementations/ord_neu.rs | 168 ++++--- src/trace/implementations/rhh.rs | 112 ++--- src/trace/mod.rs | 35 +- 8 files changed, 237 insertions(+), 591 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 3b14646e3..c22749a27 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -284,7 +284,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -303,7 +303,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 43c0e08f2..758ec8df3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder>, + Tr::Builder: Builder, { let mut reader: Option> = None; @@ -241,6 +241,7 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); + let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { use trace::cursor::MyTrait; @@ -281,10 +282,11 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); + for update in updates.drain(..) { + builder.push(update); + } } - let mut batches = vec![std::mem::take(&mut updates)]; - let batch = Tr::Builder::from_batches(&mut batches, prev_frontier.borrow(), upper.borrow(), Antichain::from_elem(G::Timestamp::minimum()).borrow()); - updates = batches.into_iter().next().unwrap_or_default(); + let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); prev_frontier.clone_from(&upper); // Communicate `batch` to the arrangement and the stream. diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 94254c569..dda549bca 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -252,7 +252,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +274,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +293,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +312,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -529,20 +529,9 @@ where // (ii) that the buffers are time-ordered, and (iii) that the builders accept // arbitrarily ordered times. for index in 0 .. buffers.len() { - // TODO: This doesn't reuse allocations for `update`. - let mut update = Vec::with_capacity(1024); buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - update.push(((key.into_owned(), val), time, diff)); - if update.len() == update.capacity() { - let mut chain = vec![update]; - builders[index].push_batches(&mut chain); - update = chain.pop().unwrap_or_else(|| Vec::with_capacity(1024)); - update.clear(); - } - } - if !update.is_empty() { - builders[index].push_batches(&mut vec![update]); + builders[index].push(((key.into_owned(), val), time, diff)); } } } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index e0e55a2f4..874f9dfd3 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -10,6 +10,7 @@ use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; use timely::progress::frontier::AntichainRef; +use crate::Data; use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; @@ -42,7 +43,7 @@ where T: Timestamp, { type Input = M::Input; - type Output = M::Batch; + type Output = M::Output; type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { @@ -95,7 +96,7 @@ where self.stash.clear(); - let seal = B::from_batches(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow()); + let seal = M::seal::(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow()); self.lower = upper; seal } @@ -185,6 +186,8 @@ pub trait Merger: Default { type Input; /// The internal representation of batches of data. type Batch: Container; + /// The output type + type Output; /// The type of time in frontiers to extract updates. type Time; /// Accept a fresh batch of input data. @@ -196,6 +199,9 @@ pub trait Merger: Default { /// Extract ready updates based on the `upper` frontier. fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, keep: &mut Vec, stash: &mut Vec); + /// Build from a chain + fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output; + /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). fn account(batch: &Self::Batch) -> (usize, usize, usize, usize); } @@ -241,12 +247,13 @@ impl VecMerger { } } -impl Merger for VecMerger<(D,T,R)> { +impl Merger for VecMerger<((K,V),T,R)> { type Time = T; - type Input = Vec<(D,T,R)>; - type Batch = Vec<(D,T,R)>; + type Input = Vec<((K,V),T,R)>; + type Batch = Vec<((K,V),T,R)>; + type Output = ((K,V), T, R); - fn accept(&mut self, batch: RefOrMut>, stash: &mut Vec) -> Vec> { + fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { // `batch` is either a shared reference or an owned allocations. let mut owned = match batch { RefOrMut::Ref(vec) => { @@ -271,11 +278,11 @@ impl>) -> Vec> { + fn finish(&mut self, _stash: &mut Vec) -> Vec { vec![] } - fn merge(&mut self, list1: Vec>, list2: Vec>, output: &mut Vec>, stash: &mut Vec>) { + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); @@ -343,7 +350,7 @@ impl>, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec>, kept: &mut Vec>, stash: &mut Vec>) { + fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec) { let mut keep = self.empty(stash); let mut ready = self.empty(stash); @@ -377,6 +384,43 @@ impl>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { + let mut builder = { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for buffer in chain.iter() { + for ((key, val), time, _) in buffer.iter() { + if !upper.less_equal(time) { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } + else if p_val != val { + vals += 1; + } + upds += 1; + } else { + keys += 1; + vals += 1; + upds += 1; + } + prev_keyval = Some((key, val)); + } + } + } + B::with_capacity(keys, vals, upds) + }; + + for datum in chain.drain(..).flatten() { + builder.push(datum); + } + + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + } + fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { (batch.len(), 0, 0, 0) } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index bc86e0b60..b0d8b2e1f 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -4,18 +4,14 @@ use std::cmp::Ordering; use timely::{Container, Data, PartialOrder}; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; -use timely::logging::WorkerIdentifier; -use timely::logging_core::Logger; -use timely::progress::{frontier::Antichain, Timestamp}; -use timely::progress::frontier::AntichainRef; +use timely::progress::frontier::{Antichain, AntichainRef}; use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; -use crate::logging::{BatcherEvent, DifferentialEvent}; -use crate::trace::{Batcher, Builder}; +use crate::trace::Builder; use crate::trace::implementations::merge_batcher::Merger; -/// TODO +/// A merger for timely stacks pub struct ColumnationMerger { pending: Vec, } @@ -61,15 +57,17 @@ impl ColumnationMerger { } } -impl Merger for ColumnationMerger<(D, T, R)> +impl Merger for ColumnationMerger<((K,V), T, R)> where - D: Columnation + Ord + Data, + K: Columnation + Ord + Data, + V: Columnation + Ord + Data, T: Columnation + Ord + PartialOrder + Data, R: Columnation + Semigroup + 'static, { type Time = T; - type Input = Vec<(D,T,R)>; - type Batch = TimelyStack<(D,T,R)>; + type Input = Vec<((K,V),T,R)>; + type Batch = TimelyStack<((K,V),T,R)>; + type Output = ((K,V),T,R); fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { // `batch` is either a shared reference or an owned allocations. @@ -217,84 +215,13 @@ where } } - fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let cb = |siz, cap| { - size += siz; - capacity += cap; - allocations += 1; - }; - batch.heap_size(cb); - (batch.len(), size, capacity, allocations) - } -} - - -/// Creates batches from unordered tuples. -pub struct ColumnatedMergeBatcher -where - K: Columnation + 'static, - V: Columnation + 'static, - T: Columnation + 'static, - D: Columnation + 'static, -{ - sorter: MergeSorterColumnation<(K, V), T, D>, - lower: Antichain, - frontier: Antichain, -} - -impl Batcher for ColumnatedMergeBatcher -where - K: Columnation + Ord + Clone + 'static, - V: Columnation + Ord + Clone + 'static, - T: Columnation + Timestamp + 'static, - D: Columnation + Semigroup + 'static, -{ - type Input = Vec<((K,V),T,D)>; - type Output = Vec<((K,V),T,D)>; - type Time = T; - - fn new(logger: Option>, operator_id: usize) -> Self { - ColumnatedMergeBatcher { - sorter: MergeSorterColumnation::new(logger, operator_id), - frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), - } - } - - #[inline] - fn push_batch(&mut self, batch: RefOrMut) { - // `batch` is either a shared reference or an owned allocations. - match batch { - RefOrMut::Ref(reference) => { - // This is a moment at which we could capture the allocations backing - // `batch` into a different form of region, rather than just cloning. - self.sorter.push(&mut reference.clone()); - }, - RefOrMut::Mut(reference) => { - self.sorter.push(reference); - } - } - } - - // Sealing a batch means finding those updates with times not greater or equal to any time - // in `upper`. All updates must have time greater or equal to the previously used `upper`, - // which we call `lower`, by assumption that after sealing a batcher we receive no more - // updates with times not greater or equal to `upper`. - #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { - - let mut merged = Default::default(); - self.sorter.finish_into(&mut merged); - - // Determine the number of distinct keys, values, and updates, - // and form a builder pre-sized for these numbers. + fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { let mut builder = { let mut keys = 0; let mut vals = 0; let mut upds = 0; let mut prev_keyval = None; - for buffer in merged.iter() { + for buffer in chain.iter() { for ((key, val), time, _) in buffer.iter() { if !upper.less_equal(time) { if let Some((p_key, p_val)) = prev_keyval { @@ -318,72 +245,26 @@ where B::with_capacity(keys, vals, upds) }; - let mut kept = Vec::new(); - let mut keep = TimelyStack::default(); - let mut readied = Vec::new(); - let mut ready = Vec::default(); - - self.frontier.clear(); - - for buffer in merged.drain(..) { - for datum @ ((_key, _val), time, _diff) in &buffer[..] { - if upper.less_equal(time) { - self.frontier.insert(time.clone()); - if keep.is_empty() { - if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { - keep = self.sorter.empty(); - } - } else if keep.len() == keep.capacity() { - kept.push(keep); - keep = self.sorter.empty(); - } - keep.copy(datum); - } - else { - if ready.is_empty() { - if ready.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { - ready = Vec::with_capacity(MergeSorterColumnation::<(K, V), T, D>::buffer_size()); - } - } else if ready.len() == ready.capacity() { - readied.push(ready); - ready = Vec::with_capacity(MergeSorterColumnation::<(K, V), T, D>::buffer_size()); - } - ready.push(datum.clone()); - } - } - // Recycling buffer. - self.sorter.recycle(buffer); - } - - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !kept.is_empty() { - self.sorter.push_list(kept); - } - - if !ready.is_empty() { - readied.push(ready); - } - if !readied.is_empty() { - builder.push_batches(&mut readied); + for datum in chain.iter().map(|ts| ts.iter()).flatten() { + builder.copy(datum); } - // Drain buffers (fast reclamation). - self.sorter.clear_stash(); - - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); - self.lower = upper; - seal + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - /// The frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { - self.frontier.borrow() + fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let cb = |siz, cap| { + size += siz; + capacity += cap; + allocations += 1; + }; + batch.heap_size(cb); + (batch.len(), size, capacity, allocations) } } + struct TimelyStackQueue { list: TimelyStack, head: usize, @@ -424,252 +305,3 @@ impl TimelyStackQueue { self.list[self.head..].iter() } } - -struct MergeSorterColumnation { - /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. - queue: Vec>>, - stash: Vec>, - pending: Vec<(D, T, R)>, - logger: Option>, - operator_id: usize, -} - -impl MergeSorterColumnation { - - const BUFFER_SIZE_BYTES: usize = 64 << 10; - - /// Buffer size (number of elements) to use for new/empty buffers. - const fn buffer_size() -> usize { - let size = std::mem::size_of::<(D, T, R)>(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - - /// Buffer size for pending updates, currently 2 * [`Self::buffer_size`]. - const fn pending_buffer_size() -> usize { - Self::buffer_size() * 2 - } - - fn new(logger: Option>, operator_id: usize) -> Self { - Self { - logger, - operator_id, - queue: Vec::new(), - stash: Vec::new(), - pending: Vec::new(), - } - } - - fn empty(&mut self) -> TimelyStack<(D, T, R)> { - self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) - } - - /// Remove all elements from the stash. - fn clear_stash(&mut self) { - self.stash.clear(); - } - - /// Insert an empty buffer into the stash. Panics if the buffer is not empty. - fn recycle(&mut self, mut buffer: TimelyStack<(D, T, R)>) { - if buffer.capacity() == Self::buffer_size() && self.stash.len() < 2 { - buffer.clear(); - self.stash.push(buffer); - } - } - - fn push(&mut self, batch: &mut Vec<(D, T, R)>) { - // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. - if self.pending.capacity() < Self::pending_buffer_size() { - self.pending.reserve(Self::pending_buffer_size() - self.pending.capacity()); - } - - while !batch.is_empty() { - self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); - if self.pending.len() == self.pending.capacity() { - crate::consolidation::consolidate_updates(&mut self.pending); - if self.pending.len() > self.pending.capacity() / 2 { - // Flush if `self.pending` is more than half full after consolidation. - self.flush_pending(); - } - } - } - } - - /// Move all elements in `pending` into `queue`. The data in `pending` must be compacted and - /// sorted. After this function returns, `self.pending` is empty. - fn flush_pending(&mut self) { - if !self.pending.is_empty() { - let mut stack = self.empty(); - stack.reserve_items(self.pending.iter()); - for tuple in self.pending.drain(..) { - stack.copy(&tuple); - } - self.queue_push(vec![stack]); - while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - } - } - - // This is awkward, because it isn't a power-of-two length any more, and we don't want - // to break it down to be so. - fn push_list(&mut self, list: Vec>) { - while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - self.queue_push(list); - } - - fn finish_into(&mut self, target: &mut Vec>) { - crate::consolidation::consolidate_updates(&mut self.pending); - self.flush_pending(); - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - - if let Some(mut last) = self.queue_pop() { - std::mem::swap(&mut last, target); - } - } - - // merges two sorted input lists into one sorted output list. - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { - - // TODO: `list1` and `list2` get dropped; would be better to reuse? - let mut output = Vec::with_capacity(list1.len() + list2.len()); - let mut result = self.empty(); - - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); - - let mut head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); - let mut head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); - - // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { - - let cmp = { - let x = head1.peek(); - let y = head2.peek(); - (&x.0, &x.1).cmp(&(&y.0, &y.1)) - }; - match cmp { - Ordering::Less => { result.copy(head1.pop()); } - Ordering::Greater => { result.copy(head2.pop()); } - Ordering::Equal => { - let (data1, time1, diff1) = head1.pop(); - let (_data2, _time2, diff2) = head2.pop(); - let mut diff1 = diff1.clone(); - diff1.plus_equals(diff2); - if !diff1.is_zero() { - result.copy_destructured(data1, time1, &diff1); - } - } - } - } - - if result.capacity() == result.len() { - output.push(result); - result = self.empty(); - } - - if head1.is_empty() { - self.recycle(head1.done()); - head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); - } - if head2.is_empty() { - self.recycle(head2.done()); - head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); - } - } - - if result.len() > 0 { - output.push(result); - } else { - self.recycle(result); - } - - if !head1.is_empty() { - let mut result = self.empty(); - result.reserve_items(head1.iter()); - for item in head1.iter() { result.copy(item); } - output.push(result); - } - output.extend(list1); - - if !head2.is_empty() { - let mut result = self.empty(); - result.reserve_items(head2.iter()); - for item in head2.iter() { result.copy(item); } - output.push(result); - } - output.extend(list2); - - output - } -} - -impl MergeSorterColumnation { - /// Pop a batch from `self.queue` and account size changes. - #[inline] - fn queue_pop(&mut self) -> Option>> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten(), -1); - batch - } - - /// Push a batch to `self.queue` and account size changes. - #[inline] - fn queue_push(&mut self, batch: Vec>) { - self.account(&batch, 1); - self.queue.push(batch); - } - - /// Account size changes. Only performs work if a logger exists. - /// - /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute - /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. - fn account<'a, I: IntoIterator>>(&self, items: I, diff: isize) { - if let Some(logger) = &self.logger { - let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize); - for stack in items { - records = records.saturating_add_unsigned(stack.len()); - stack.heap_size(|s, c| { - siz = siz.saturating_add_unsigned(s); - capacity = capacity.saturating_add_unsigned(c); - allocations += isize::from(c > 0); - }); - } - logger.log(BatcherEvent { - operator: self.operator_id, - records_diff: records * diff, - size_diff: siz * diff, - capacity_diff: capacity * diff, - allocations_diff: allocations * diff, - }) - } - } - -} - -impl Drop for MergeSorterColumnation { - fn drop(&mut self) { - while self.queue_pop().is_some() { } - } -} diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 0ece74bc1..a5afee109 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -12,7 +12,7 @@ use std::rc::Rc; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; -use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; +use crate::trace::implementations::merge_batcher_col::ColumnationMerger; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred}; @@ -32,8 +32,7 @@ pub type OrdValSpine = Spine< /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine< Rc>>, - // MergeBatcher, T>, - ColumnatedMergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; @@ -49,16 +48,14 @@ pub type OrdKeySpine = Spine< /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine< Rc>>, - // MergeBatcher, T>, - ColumnatedMergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, - // MergeBatcher::Owned,::Owned),T,R)>,T>, - ColumnatedMergeBatcher<::Owned,::Owned,T,R>, + MergeBatcher::Owned,::Owned),T,R)>,T>, RcBuilder>>, >; @@ -541,7 +538,7 @@ mod val_batch { impl Builder for OrdValBuilder { - type Input = Vec<((::Key, ::Val), ::Time, ::Diff)>; + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -560,60 +557,59 @@ mod val_batch { } } - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in batches.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); - } + #[inline] + fn push(&mut self, ((key, val), time, diff): Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.vals.push(val); } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time, diff); + self.result.vals.push(val); + self.result.keys.push(key); } - let mut new = Self::with_capacity(keys, vals, upds); - new.push_batches(batches); - new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - fn push_batches(&mut self, batches: &mut Vec) { - for ((key, val), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.vals.push(val); - } + #[inline] + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { + // TODO: here we could look for repetition, and not push the update in that case. + // More logic (and state) would be required to correctly wrangle this. + self.push_update(time.clone(), diff.clone()); } else { - // New key; complete representation of prior key. + // New value; complete representation of prior value. self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); + self.result.keys.copy_push(key); } } @@ -1006,7 +1002,7 @@ mod key_batch { impl Builder for OrdKeyBuilder { - type Input = Vec<((::Key, ()), ::Time, ::Diff)>; + type Input = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; type Output = OrdKeyBatch; @@ -1023,45 +1019,35 @@ mod key_batch { } } - /// Build from batches - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let mut keys = 0; - let mut upds = 0; - let mut prev_key = None; - for buffer in batches.iter() { - for ((key, ()), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some(p_key) = prev_key { - if p_key != key { - keys += 1; - } - upds += 1; - } else { - keys += 1; - upds += 1; - } - prev_key = Some(key); - } - } + #[inline] + fn push(&mut self, ((key, ()), time, diff): Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.keys.push(key); } - let mut new = Self::with_capacity(keys, 0, upds); - new.push_batches(batches); - new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - fn push_batches(&mut self, batches: &mut Vec) { - for ((key, ()), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); - } + #[inline] + fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { + self.push_update(time.clone(), diff.clone()); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time.clone(), diff.clone()); + self.result.keys.copy_push(key); } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index bceebde29..64cac268b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -6,11 +6,14 @@ //! for example wrapped types that implement `Ord` that way. use std::rc::Rc; +use std::cmp::Ordering; + +use abomonation_derive::Abomonation; use crate::Hashable; -use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; -use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; +use crate::trace::implementations::merge_batcher_col::ColumnationMerger; +use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack}; @@ -29,7 +32,7 @@ pub type VecSpine = Spine< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine< Rc>>, - ColumnatedMergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; // /// A trace implementation backed by columnar storage. @@ -47,9 +50,6 @@ pub struct HashWrapper { pub inner: T } -use std::cmp::Ordering; -use abomonation_derive::Abomonation; - impl PartialOrd for HashWrapper where ::Output: PartialOrd { fn partial_cmp(&self, other: &Self) -> Option { @@ -729,41 +729,10 @@ mod val_batch { ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Input = Vec<((::Key, ::Val), ::Time, ::Diff)>; + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in batches.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); - } - } - } - let mut new = Self::with_capacity(keys, vals, upds); - new.push_batches(batches); - new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) - } - fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { // Double the capacity for RHH; probably excessive. @@ -791,30 +760,61 @@ mod val_batch { } } - fn push_batches(&mut self, batches: &mut Vec) { - for ((key, val), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.vals.push(val); - } + #[inline] + fn push(&mut self, ((key, val), time, diff): Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { + self.push_update(time, diff); } else { - // New key; complete representation of prior key. + // New value; complete representation of prior value. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time, diff); + self.result.vals.push(val); + // Insert the key, but with no specified offset. + self.result.insert_key(key.borrow(), None); + } + } + + #[inline] + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { + // TODO: here we could look for repetition, and not push the update in that case. + // More logic (and state) would be required to correctly wrangle this. + self.push_update(time.clone(), diff.clone()); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); + } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); + // Insert the key, but with no specified offset. + self.result.insert_key(key, None); } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 095e3fc84..6887283a2 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -320,16 +320,13 @@ pub trait Batcher { /// Functionality for building batches from ordered update sequences. pub trait Builder: Sized { - /// Input type. + /// Input item type. type Input; /// Timestamp type. type Time: Timestamp; /// Output batch type. type Output; - /// Build from batches - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output; - /// Allocates an empty builder. /// /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`. @@ -338,14 +335,17 @@ pub trait Builder: Sized { /// Allocates an empty builder with capacity for the specified keys, values, and updates. /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. - // #[deprecated] fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds elements in sorted order to the batch. - /// TODO: Refine the `batches` parameter to allow allocation reuse. - // #[deprecated] - fn push_batches(&mut self, batches: &mut Vec); + /// Adds an element to the batch. + /// + /// The default implementation uses `self.copy` with references to the owned arguments. + /// One should override it if the builder can take advantage of owned arguments. + fn push(&mut self, element: Self::Input) { + self.copy(&element); + } + /// Adds an element to the batch. + fn copy(&mut self, element: &Self::Input); /// Completes building and returns the batch. - // #[deprecated] fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -453,11 +453,9 @@ pub mod rc_blanket_impls { type Input = B::Input; type Time = B::Time; type Output = Rc; - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - Rc::new(B::from_batches(batches, lower, upper, since)) - } fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push_batches(&mut self, batches: &mut Vec) { self.builder.push_batches(batches) } + fn push(&mut self, element: Self::Input) { self.builder.push(element) } + fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -562,14 +560,9 @@ pub mod abomonated_blanket_impls { type Input = B::Input; type Time = B::Time; type Output = Abomonated>; - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let batch = B::from_batches(batches, lower, upper, since); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push_batches(&mut self, batches: &mut Vec) { self.builder.push_batches(batches) } + fn push(&mut self, element: Self::Input) { self.builder.push(element) } + fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch)); From a3c290e21e558d360aa9a95d8aef65deed05142d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 26 Apr 2024 11:12:01 -0400 Subject: [PATCH 3/5] Address feedback Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 2 +- src/trace/implementations/merge_batcher.rs | 246 +++++++++++------- .../implementations/merge_batcher_col.rs | 162 +++++++----- src/trace/mod.rs | 4 +- tests/trace.rs | 2 +- 5 files changed, 248 insertions(+), 168 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index c22749a27..ba7bf2045 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -464,7 +464,7 @@ where input.for_each(|cap, data| { capabilities.insert(cap.retain()); - batcher.push_batch(data); + batcher.push_container(data); }); // The frontier may have advanced by multiple elements, which is an issue because diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 874f9dfd3..f9038d801 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,7 +1,6 @@ //! A general purpose `Batcher` implementation based on radix sort. use std::collections::VecDeque; -use std::marker::PhantomData; use timely::communication::message::RefOrMut; use timely::{Container, PartialOrder}; @@ -21,10 +20,12 @@ pub struct MergeBatcher where M: Merger, { - /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. - queue: Vec>, - /// Stash of empty batches - stash: Vec, + /// each power-of-two length list of allocations. + /// Do not push/pop directly but use the corresponding functions + /// ([`Self::chain_push`]/[`Self::chain_pop`]). + chains: Vec>, + /// Stash of empty chunks + stash: Vec, /// Thing to accept data, merge chains, and talk to the builder. merger: M, /// Logger for size accounting. @@ -51,16 +52,18 @@ where logger, operator_id, merger: M::default(), - queue: Vec::new(), + chains: Vec::new(), stash: Vec::new(), frontier: Antichain::new(), lower: Antichain::from_elem(T::minimum()), } } - fn push_batch(&mut self, batch: RefOrMut) { - let batch = self.merger.accept(batch, &mut self.stash); - self.insert_chain(batch); + /// Push a container of data into this merge batcher. Updates the internal chain structure if + /// needed. + fn push_container(&mut self, container: RefOrMut) { + let chain = self.merger.accept(container, &mut self.stash); + self.insert_chain(chain); } // Sealing a batch means finding those updates with times not greater or equal to any time @@ -69,19 +72,19 @@ where // updates with times not greater or equal to `upper`. fn seal>(&mut self, upper: Antichain) -> B::Output { // Finish - let batch = self.merger.finish(&mut self.stash); - if !batch.is_empty() { - self.queue_push(batch); + let chain = self.merger.finish(&mut self.stash); + if !chain.is_empty() { + self.chain_push(chain); } // Merge all remaining chains into a single chain. - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); + while self.chains.len() > 1 { + let list1 = self.chain_pop().unwrap(); + let list2 = self.chain_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue_push(merged); + self.chain_push(merged); } - let merged = self.queue_pop().unwrap_or_default(); + let merged = self.chain_pop().unwrap_or_default(); // Extract readied data. let mut kept = Vec::new(); @@ -91,7 +94,7 @@ where self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash); if !kept.is_empty() { - self.queue_push(kept); + self.chain_push(kept); } self.stash.clear(); @@ -111,22 +114,22 @@ impl MergeBatcher where M: Merger { - fn insert_chain(&mut self, chain: Vec<::Batch>) { + /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered + /// by decreasing length. + fn insert_chain(&mut self, chain: Vec) { if !chain.is_empty() { - self.queue_push(chain); - while self.queue.len() > 1 && (self.queue[self.queue.len() - 1].len() >= self.queue[self.queue.len() - 2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); + self.chain_push(chain); + while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) { + let list1 = self.chain_pop().unwrap(); + let list2 = self.chain_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue_push(merged); + self.chain_push(merged); } } } // merges two sorted input lists into one sorted output list. - fn merge_by(&mut self, list1: Vec, list2: Vec) -> Vec { - self.account(list1.iter().chain(list2.iter()).map(M::account), -1); - + fn merge_by(&mut self, list1: Vec, list2: Vec) -> Vec { // TODO: `list1` and `list2` get dropped; would be better to reuse? let mut output = Vec::with_capacity(list1.len() + list2.len()); self.merger.merge(list1, list2, &mut output, &mut self.stash); @@ -134,24 +137,24 @@ where output } - /// Pop a batch from `self.queue` and account size changes. + /// Pop a chain and account size changes. #[inline] - fn queue_pop(&mut self) -> Option> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten().map(M::account), -1); - batch + fn chain_pop(&mut self) -> Option> { + let chain = self.chains.pop(); + self.account(chain.iter().flatten().map(M::account), -1); + chain } - /// Push a batch to `self.queue` and account size changes. + /// Push a chain and account size changes. #[inline] - fn queue_push(&mut self, batch: Vec) { - self.account(batch.iter().map(M::account), 1); - self.queue.push(batch); + fn chain_push(&mut self, chain: Vec) { + self.account(chain.iter().map(M::account), 1); + self.chains.push(chain); } /// Account size changes. Only performs work if a logger exists. /// - /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute + /// Calculate the size based on the iterator passed along, with each attribute /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. #[inline] fn account>(&self, items: I, diff: isize) { @@ -176,7 +179,8 @@ where impl Drop for MergeBatcher { fn drop(&mut self) { - while self.queue_pop().is_some() { } + // Cleanup chain to retract accounting information. + while self.chain_pop().is_some() { } } } @@ -185,41 +189,46 @@ pub trait Merger: Default { /// The type of update containers received from inputs. type Input; /// The internal representation of batches of data. - type Batch: Container; + type Chunk: Container; /// The output type + /// TODO: This should be replaced by `Chunk` or another container once the builder understands + /// building from a complete chain. type Output; /// The type of time in frontiers to extract updates. type Time; /// Accept a fresh batch of input data. - fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec; + fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec; /// Finish processing any stashed data. - fn finish(&mut self, stash: &mut Vec) -> Vec; + fn finish(&mut self, stash: &mut Vec) -> Vec; /// Merge chains into an output chain. - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec); + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec); /// Extract ready updates based on the `upper` frontier. - fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, keep: &mut Vec, stash: &mut Vec); + fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec); /// Build from a chain - fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output; + /// TODO: We can move this entirely to `MergeBatcher` once builders can accepts chains. + fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output; /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). - fn account(batch: &Self::Batch) -> (usize, usize, usize, usize); + fn account(batch: &Self::Chunk) -> (usize, usize, usize, usize); } /// A merger that knows how to accept and maintain chains of vectors. pub struct VecMerger { - _marker: PhantomData, + pending: Vec, } impl Default for VecMerger { fn default() -> Self { - Self {_marker: PhantomData} + Self { + pending: Vec::default(), + } } } impl VecMerger { const BUFFER_SIZE_BYTES: usize = 1 << 13; - fn preferred_buffer_size(&self) -> usize { + fn chunk_capacity(&self) -> usize { let size = ::std::mem::size_of::(); if size == 0 { Self::BUFFER_SIZE_BYTES @@ -230,17 +239,21 @@ impl VecMerger { } } + fn pending_capacity(&self) -> usize { + self.chunk_capacity() * 2 + } + /// Helper to get pre-sized vector from the stash. #[inline] fn empty(&self, stash: &mut Vec>) -> Vec { - stash.pop().unwrap_or_else(|| Vec::with_capacity(self.preferred_buffer_size())) + stash.pop().unwrap_or_else(|| Vec::with_capacity(self.chunk_capacity())) } /// Helper to return a batch to the stash. #[inline] fn recycle(&self, mut batch: Vec, stash: &mut Vec>) { // TODO: Should we limit the size of `stash`? - if batch.capacity() == self.preferred_buffer_size() /*&& stash.len() < 2*/ { + if batch.capacity() == self.chunk_capacity() /*&& stash.len() < 2*/ { batch.clear(); stash.push(batch); } @@ -250,39 +263,78 @@ impl VecMerger { impl Merger for VecMerger<((K,V),T,R)> { type Time = T; type Input = Vec<((K,V),T,R)>; - type Batch = Vec<((K,V),T,R)>; + type Chunk = Vec<((K, V), T, R)>; type Output = ((K,V), T, R); - fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { + fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + if self.pending.capacity() < self.pending_capacity() { + self.pending.reserve(self.pending_capacity() - self.pending.len()); + } + + // Form a chain from what's in pending. + // This closure does the following: + // * If pending is full, consolidate. + // * If after consolidation it's more than half full, peel off a chain of full blocks, + // leaving behind any partial block in pending. + // * Merge the new chain with `final_chain` and return it in-place. + let form_chain = |this: &mut Self, final_chain: &mut Vec, stash: &mut _| { + if this.pending.len() == this.pending.capacity() { + consolidate_updates(&mut this.pending); + if this.pending.len() > this.pending.capacity() / 2 { + let mut chain = Vec::default(); + while this.pending.len() > this.chunk_capacity() { + let mut chunk = this.empty(stash); + chunk.extend(this.pending.drain(..chunk.capacity())); + chain.push(chunk); + } + if final_chain.is_empty() { + *final_chain = chain; + } else if !chain.is_empty() { + let mut output = Vec::default(); + this.merge(std::mem::take(final_chain), chain, &mut output, stash); + *final_chain = output; + } + } + } + }; + + let mut final_chain = Vec::default(); // `batch` is either a shared reference or an owned allocations. - let mut owned = match batch { + match batch { RefOrMut::Ref(vec) => { - let mut owned = self.empty(stash); - owned.clone_from(vec); - owned + let mut slice = &vec[..]; + while !slice.is_empty() { + let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); + slice = tail; + self.pending.extend_from_slice(head); + form_chain(self, &mut final_chain, stash); + } } - RefOrMut::Mut(vec) => std::mem::take(vec) - }; - consolidate_updates(&mut owned); - if owned.capacity() == self.preferred_buffer_size() { - vec![owned] - } else { - let mut chain = Vec::with_capacity((owned.len() + self.preferred_buffer_size() - 1) / self.preferred_buffer_size()); - let mut iter = owned.drain(..).peekable(); - while iter.peek().is_some() { - let mut batch = self.empty(stash); - batch.extend((&mut iter).take(self.preferred_buffer_size())); - chain.push(batch); + RefOrMut::Mut(vec) => { + while !vec.is_empty() { + self.pending.extend(vec.drain(..std::cmp::min(self.pending.capacity() - self.pending.len(), vec.len()))); + form_chain(self, &mut final_chain, stash); + } } - chain } + final_chain } - fn finish(&mut self, _stash: &mut Vec) -> Vec { - vec![] + fn finish(&mut self, stash: &mut Vec) -> Vec { + // Extract all data from `pending`. + consolidate_updates(&mut self.pending); + let mut chain = Vec::default(); + while !self.pending.is_empty() { + let mut chunk = self.empty(stash); + chunk.extend(self.pending.drain(..std::cmp::min(chunk.capacity(), self.pending.len()))); + chain.push(chunk); + } + chain } - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); @@ -350,14 +402,14 @@ impl, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec) { + fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec) { let mut keep = self.empty(stash); let mut ready = self.empty(stash); for mut buffer in merged { for (data, time, diff) in buffer.drain(..) { if upper.less_equal(&time) { - frontier.insert(time.clone()); + frontier.insert_ref(&time); if keep.len() == keep.capacity() && !keep.is_empty() { kept.push(keep); keep = self.empty(stash); @@ -384,35 +436,33 @@ impl>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { - let mut builder = { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in chain.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { + fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for buffer in chain.iter() { + for ((key, val), time, _) in buffer.iter() { + if !upper.less_equal(time) { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { keys += 1; vals += 1; - upds += 1; } - prev_keyval = Some((key, val)); + else if p_val != val { + vals += 1; + } + upds += 1; + } else { + keys += 1; + vals += 1; + upds += 1; } + prev_keyval = Some((key, val)); } } - B::with_capacity(keys, vals, upds) - }; + } + let mut builder = B::with_capacity(keys, vals, upds); for datum in chain.drain(..).flatten() { builder.push(datum); @@ -421,7 +471,7 @@ impl (usize, usize, usize, usize) { + fn account(batch: &Self::Chunk) -> (usize, usize, usize, usize) { (batch.len(), 0, 0, 0) } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index b0d8b2e1f..711a885f5 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -24,7 +24,7 @@ impl Default for ColumnationMerger { impl ColumnationMerger { const BUFFER_SIZE_BYTES: usize = 1 << 13; - fn preferred_buffer_size(&self) -> usize { + fn chunk_capacity(&self) -> usize { let size = ::std::mem::size_of::(); if size == 0 { Self::BUFFER_SIZE_BYTES @@ -35,22 +35,22 @@ impl ColumnationMerger { } } - /// Buffer size for pending updates, currently 4 * [`Self::buffer_size`]. - fn pending_buffer_size(&self) -> usize { - self.preferred_buffer_size() * 1 + /// Buffer size for pending updates, currently 4 * [`Self::chunk_capacity`]. + fn pending_capacity(&self) -> usize { + self.chunk_capacity() * 4 } /// Helper to get pre-sized vector from the stash. #[inline] fn empty(&self, stash: &mut Vec>) -> TimelyStack { - stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(self.preferred_buffer_size())) + stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(self.chunk_capacity())) } /// Helper to return a batch to the stash. #[inline] fn recycle(&self, mut batch: TimelyStack, stash: &mut Vec>) { // TODO: Should we limit the size of `stash`? - if batch.capacity() == self.preferred_buffer_size() /*&& stash.len() < 2*/ { + if batch.capacity() == self.chunk_capacity() /*&& stash.len() < 2*/ { batch.clear(); stash.push(batch); } @@ -66,50 +66,82 @@ where { type Time = T; type Input = Vec<((K,V),T,R)>; - type Batch = TimelyStack<((K,V),T,R)>; + type Chunk = TimelyStack<((K, V), T, R)>; type Output = ((K,V),T,R); - fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { - // `batch` is either a shared reference or an owned allocations. - let mut batch: Vec<_> = match batch { - RefOrMut::Ref(vec) => { - let mut owned = Vec::default(); - owned.clone_from(vec); - owned - } - RefOrMut::Mut(vec) => std::mem::take(vec) - }; - // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. - if self.pending.capacity() < self.pending_buffer_size() { - self.pending.reserve(self.pending_buffer_size() - self.pending.capacity()); + fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + if self.pending.capacity() < self.pending_capacity() { + self.pending.reserve(self.pending_capacity() - self.pending.len()); } - let mut output = Vec::default(); - - while !batch.is_empty() { - self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); - if self.pending.len() == self.pending.capacity() { - consolidate_updates(&mut self.pending); - if self.pending.len() > self.pending.capacity() / 2 { - // Flush if `self.pending` is more than half full after consolidation. - let mut stack = self.empty(stash); - stack.reserve_items(self.pending.iter()); - for tuple in self.pending.drain(..) { - stack.copy(&tuple); + // Form a chain from what's in pending. + // This closure does the following: + // * If pending is full, consolidate. + // * If after consolidation it's more than half full, peel off a chain of full blocks, + // leaving behind any partial block in pending. + // * Merge the new chain with `final_chain` and return it in-place. + let form_chain = |this: &mut Self, final_chain: &mut Vec, stash: &mut _| { + if this.pending.len() == this.pending.capacity() { + consolidate_updates(&mut this.pending); + if this.pending.len() > this.pending.capacity() / 2 { + let mut chain = Vec::default(); + while this.pending.len() > this.chunk_capacity() { + let mut chunk = this.empty(stash); + for datum in this.pending.drain(..chunk.capacity()) { + chunk.copy(&datum); + } + chain.push(chunk); + } + if final_chain.is_empty() { + *final_chain = chain; + } else if !chain.is_empty() { + let mut output = Vec::default(); + this.merge(std::mem::take(final_chain), chain, &mut output, stash); + *final_chain = output; } - output.push(stack); } } - } + }; - output + let mut final_chain = Vec::default(); + // `batch` is either a shared reference or an owned allocations. + match batch { + RefOrMut::Ref(vec) => { + let mut slice = &vec[..]; + while !slice.is_empty() { + let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); + slice = tail; + self.pending.extend_from_slice(head); + form_chain(self, &mut final_chain, stash); + } + } + RefOrMut::Mut(vec) => { + while !vec.is_empty() { + self.pending.extend(vec.drain(..std::cmp::min(self.pending.capacity() - self.pending.len(), vec.len()))); + form_chain(self, &mut final_chain, stash); + } + } + } + final_chain } - fn finish(&mut self, _stash: &mut Vec) -> Vec { - vec![] + fn finish(&mut self, stash: &mut Vec) -> Vec { + // Extract all data from `pending`. + consolidate_updates(&mut self.pending); + let mut chain = Vec::default(); + while !self.pending.is_empty() { + let mut chunk = self.empty(stash); + for datum in self.pending.drain(..std::cmp::min(chunk.capacity(), self.pending.len())) { + chunk.copy(&datum); + } + chain.push(chunk); + } + chain } - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); @@ -181,14 +213,14 @@ where output.extend(list2); } - fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec) { + fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec) { let mut keep = self.empty(stash); let mut ready = self.empty(stash); for buffer in merged { for d @ (_data, time, _diff) in buffer.iter() { if upper.less_equal(time) { - frontier.insert(time.clone()); + frontier.insert_ref(time); if keep.len() == keep.capacity() && !keep.is_empty() { kept.push(keep); keep = self.empty(stash); @@ -215,44 +247,42 @@ where } } - fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { - let mut builder = { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in chain.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { + fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for buffer in chain.iter() { + for ((key, val), time, _) in buffer.iter() { + if !upper.less_equal(time) { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { keys += 1; vals += 1; - upds += 1; } - prev_keyval = Some((key, val)); + else if p_val != val { + vals += 1; + } + upds += 1; + } else { + keys += 1; + vals += 1; + upds += 1; } + prev_keyval = Some((key, val)); } } - B::with_capacity(keys, vals, upds) - }; + } + let mut builder = B::with_capacity(keys, vals, upds); - for datum in chain.iter().map(|ts| ts.iter()).flatten() { + for datum in chain.iter().flat_map(|ts| ts.iter()) { builder.copy(datum); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { + fn account(batch: &Self::Chunk) -> (usize, usize, usize, usize) { let (mut size, mut capacity, mut allocations) = (0, 0, 0); let cb = |siz, cap| { size += siz; @@ -301,7 +331,7 @@ impl TimelyStackQueue { fn is_empty(&self) -> bool { self.head == self.list[..].len() } /// Return an iterator over the remaining elements. - fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { + fn iter(&self) -> impl Iterator + Clone { self.list[self.head..].iter() } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 6887283a2..afee7c22e 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -310,8 +310,8 @@ pub trait Batcher { type Time: Timestamp; /// Allocates a new empty batcher. fn new(logger: Option>, operator_id: usize) -> Self; - /// Adds an unordered batch of elements to the batcher. - fn push_batch(&mut self, batch: RefOrMut); + /// Adds an unordered container of elements to the batcher. + fn push_container(&mut self, batch: RefOrMut); /// Returns all updates not greater or equal to an element of `upper`. fn seal>(&mut self, upper: Antichain) -> B::Output; /// Returns the lower envelope of contained update times. diff --git a/tests/trace.rs b/tests/trace.rs index a637b1700..acf1db350 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -15,7 +15,7 @@ fn get_trace() -> ValSpine { let mut batcher = ::Batcher::new(None, 0); use timely::communication::message::RefOrMut; - batcher.push_batch(RefOrMut::Mut(&mut vec![ + batcher.push_container(RefOrMut::Mut(&mut vec![ ((1, 2), 0, 1), ((2, 3), 1, 1), ((2, 3), 2, -1), From c5b1b1efcbcf4b54d234cb093db248f626dac160 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 26 Apr 2024 11:25:37 -0400 Subject: [PATCH 4/5] Formatting and renaming Signed-off-by: Moritz Hoffmann --- src/trace/implementations/merge_batcher.rs | 123 +++++++++++------- .../implementations/merge_batcher_col.rs | 93 +++++++------ 2 files changed, 131 insertions(+), 85 deletions(-) diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index f9038d801..a32bb6f56 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -3,17 +3,17 @@ use std::collections::VecDeque; use timely::communication::message::RefOrMut; -use timely::{Container, PartialOrder}; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; -use timely::progress::{frontier::Antichain, Timestamp}; use timely::progress::frontier::AntichainRef; +use timely::progress::{frontier::Antichain, Timestamp}; +use timely::{Container, PartialOrder}; -use crate::Data; use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; +use crate::Data; /// Creates batches from unordered tuples. pub struct MergeBatcher @@ -40,7 +40,7 @@ where impl Batcher for MergeBatcher where - M: Merger, + M: Merger