From 73db64d722509e3b04c3e11a28a1a57b9e2e23c1 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 27 Nov 2023 15:47:27 -0500 Subject: [PATCH] Remove batcher dependence on updates --- src/trace/implementations/merge_batcher.rs | 37 +++++++------- .../implementations/merge_batcher_col.rs | 50 +++++++++---------- src/trace/implementations/ord.rs | 12 ++--- src/trace/implementations/ord_neu.rs | 6 +-- src/trace/implementations/rhh.rs | 4 +- 5 files changed, 54 insertions(+), 55 deletions(-) diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 2bf547925..0f4319189 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -3,31 +3,34 @@ use std::collections::VecDeque; use timely::communication::message::RefOrMut; -use timely::progress::frontier::Antichain; +use timely::progress::{frontier::Antichain, Timestamp}; use ::difference::Semigroup; use trace::{Batcher, Builder}; -use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct MergeBatcher { - sorter: MergeSorter<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>, - lower: Antichain, - frontier: Antichain, - phantom: ::std::marker::PhantomData, +pub struct MergeBatcher { + sorter: MergeSorter<(K, V), T, D>, + lower: Antichain, + frontier: Antichain, } -impl Batcher for MergeBatcher { - type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff); - type Time = U::Time; +impl Batcher for MergeBatcher +where + K: Ord + Clone, + V: Ord + Clone, + T: Timestamp, + D: Semigroup, +{ + type Item = ((K,V),T,D); + type Time = T; fn new() -> Self { MergeBatcher { sorter: MergeSorter::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), - phantom: ::std::marker::PhantomData, + lower: Antichain::from_elem(::minimum()), } } @@ -53,7 +56,7 @@ impl Batcher for MergeBatcher { // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal>(&mut self, upper: Antichain) -> B::Output { let mut merged = Vec::new(); self.sorter.finish_into(&mut merged); @@ -126,23 +129,23 @@ impl Batcher for MergeBatcher { let mut buffer = Vec::new(); self.sorter.push(&mut buffer); // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::ValOwned),U::Time,U::Diff)>() > 0 { + while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 { buffer = Vec::new(); self.sorter.push(&mut buffer); } - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } -struct MergeSorter { +struct MergeSorter { queue: Vec>>, // each power-of-two length list of allocations. stash: Vec>, } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 4780f8485..18a6f0d9a 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,46 +1,42 @@ //! A general purpose `Batcher` implementation based on radix sort for TimelyStack. -use std::marker::PhantomData; use timely::Container; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; -use timely::progress::frontier::Antichain; +use timely::progress::{frontier::Antichain, Timestamp}; use ::difference::Semigroup; use trace::{Batcher, Builder}; -use trace::implementations::Update; /// Creates batches from unordered tuples. -pub struct ColumnatedMergeBatcher +pub struct ColumnatedMergeBatcher where - U::KeyOwned: Columnation, - U::ValOwned: Columnation, - U::Time: Columnation, - U::Diff: Columnation, + K: Columnation, + V: Columnation, + T: Columnation, + D: Columnation, { - sorter: MergeSorterColumnation<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>, - lower: Antichain, - frontier: Antichain, - phantom: PhantomData, + sorter: MergeSorterColumnation<(K, V), T, D>, + lower: Antichain, + frontier: Antichain, } -impl Batcher for ColumnatedMergeBatcher +impl Batcher for ColumnatedMergeBatcher where - U::KeyOwned: Columnation + 'static, - U::ValOwned: Columnation + 'static, - U::Time: Columnation + 'static, - U::Diff: Columnation + 'static, + K: Columnation + Ord + Clone + 'static, + V: Columnation + Ord + Clone + 'static, + T: Columnation + Timestamp + 'static, + D: Columnation + Semigroup + 'static, { - type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff); - type Time = U::Time; + type Item = ((K,V),T,D); + type Time = T; fn new() -> Self { ColumnatedMergeBatcher { sorter: MergeSorterColumnation::new(), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), - phantom: PhantomData, + lower: Antichain::from_elem(::minimum()), } } @@ -64,7 +60,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal>(&mut self, upper: Antichain) -> B::Output { let mut merged = Default::default(); self.sorter.finish_into(&mut merged); @@ -106,7 +102,7 @@ where if upper.less_equal(time) { self.frontier.insert(time.clone()); if keep.is_empty() { - if keep.capacity() != MergeSorterColumnation::<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>::buffer_size() { + if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { keep = self.sorter.empty(); } } else if keep.len() == keep.capacity() { @@ -134,13 +130,13 @@ where // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; seal } // the frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } @@ -186,13 +182,13 @@ impl TimelyStackQueue { } } -struct MergeSorterColumnation { +struct MergeSorterColumnation { queue: Vec>>, // each power-of-two length list of allocations. stash: Vec>, pending: Vec<(D, T, R)>, } -impl MergeSorterColumnation { +impl MergeSorterColumnation { const BUFFER_SIZE_BYTES: usize = 64 << 10; diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index ae99c893f..8aadd6d02 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -42,41 +42,41 @@ use trace::abomonated_blanket_impls::AbomonatedBuilder; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine< Rc>>, - MergeBatcher<((K,V),T,R)>, + MergeBatcher, RcBuilder>>, >; /// A trace implementation using a spine of abomonated ordered lists. pub type OrdValSpineAbom = Spine< Rc>, Vec>>, - MergeBatcher<((K,V),T,R)>, + MergeBatcher, AbomonatedBuilder>>, >; /// A trace implementation for empty values using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, - MergeBatcher<((K,()),T,R)>, + MergeBatcher, RcBuilder>>, >; /// A trace implementation for empty values using a spine of abomonated ordered lists. pub type OrdKeySpineAbom = Spine< Rc>, Vec>>, - MergeBatcher<((K,()),T,R)>, + MergeBatcher, AbomonatedBuilder>>, >; /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine< Rc>>, - ColumnatedMergeBatcher<((K,V),T,R)>, + ColumnatedMergeBatcher, RcBuilder>>, >; /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine< Rc>>, - ColumnatedMergeBatcher<((K,()),T,R)>, + ColumnatedMergeBatcher, RcBuilder>>, >; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index d34682338..beab03dd3 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -22,7 +22,7 @@ use self::val_batch::{OrdValBatch, OrdValBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine< Rc>>, - MergeBatcher<((K,V),T,R)>, + MergeBatcher, RcBuilder>>, >; // /// A trace implementation for empty values using a spine of ordered lists. @@ -31,14 +31,14 @@ pub type OrdValSpine = Spine< /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine< Rc>>, - ColumnatedMergeBatcher<((K,V),T,R)>, + ColumnatedMergeBatcher, RcBuilder>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, - ColumnatedMergeBatcher>, + ColumnatedMergeBatcher<::Owned,::Owned,T,R>, RcBuilder>>, >; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 542e6651f..e01cf4d45 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -20,7 +20,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine< Rc>>, - MergeBatcher<((K,V),T,R)>, + MergeBatcher, RcBuilder>>, >; // /// A trace implementation for empty values using a spine of ordered lists. @@ -29,7 +29,7 @@ pub type VecSpine = Spine< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine< Rc>>, - ColumnatedMergeBatcher<((K,V),T,R)>, + ColumnatedMergeBatcher, RcBuilder>>, >; // /// A trace implementation backed by columnar storage.