From 3ad963fdacfadb7e761cef7e3ace20f4892887c2 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Thu, 17 Mar 2022 18:43:26 +0100 Subject: [PATCH] trace: define `BatchReader` with associated types The `TraceReader` trait uses associated types to define its `Key`, `Val`, `Time`, `Diff` but the `BatchReader` trait did not, even though they are very similar in nature. Usually the choice between asssociated types or generic parameters on a trait is determined by whether or not a particular type is expected to implement the same trait multiple times. My starting point was that these two trait should at the very least be consistent with respect to their structure and either both use generic parameters or both use associated types. All the uses in this repo (and also that I can imagine being useful) don't really need `BatchReader` to be polymorphic for a particular type and so I chose to change that one to make it consistent with `TraceReader`. The result is quite pleasing as in many cases a lot of generic parameters are erased. In order to keep this PR short I left the `Cursor` trait untouched, but I believe a similar transformation would be beneficial there too, simplifying further many type signatures. Signed-off-by: Petros Angelatos --- dogsdogsdogs/src/operators/count.rs | 3 +- dogsdogsdogs/src/operators/half_join.rs | 4 +- dogsdogsdogs/src/operators/lookup_map.rs | 3 +- dogsdogsdogs/src/operators/propose.rs | 4 +- dogsdogsdogs/src/operators/validate.rs | 3 +- src/algorithms/graphs/bfs.rs | 3 +- src/algorithms/graphs/bijkstra.rs | 1 - src/algorithms/graphs/propagate.rs | 1 - src/operators/arrange/agent.rs | 2 +- src/operators/arrange/arrangement.rs | 15 ++- src/operators/arrange/upsert.rs | 4 +- src/operators/arrange/writer.rs | 8 +- src/operators/count.rs | 1 - src/operators/join.rs | 8 -- src/operators/reduce.rs | 14 +-- src/operators/threshold.rs | 1 - src/trace/implementations/merge_batcher.rs | 4 +- src/trace/implementations/ord.rs | 20 +++- src/trace/implementations/spine_fueled.rs | 26 ++--- src/trace/mod.rs | 116 ++++++++++++--------- src/trace/wrappers/enter.rs | 50 ++++----- src/trace/wrappers/enter_at.rs | 51 ++++----- src/trace/wrappers/filter.rs | 48 ++++----- src/trace/wrappers/freeze.rs | 41 +++----- src/trace/wrappers/frontier.rs | 48 ++++----- tests/trace.rs | 2 +- 26 files changed, 219 insertions(+), 262 deletions(-) diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index bae14c7b3..bb75f26a9 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Reports a number of extensions to a stream of prefixes. /// @@ -23,7 +23,6 @@ where G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 8f6f4d0cf..482bbb570 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Monoid, Semigroup}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// A binary equijoin that responds to updates on only its first input. @@ -81,7 +81,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, @@ -137,7 +136,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 671e24921..081f17bc4 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a stream of prefixes. /// @@ -32,7 +32,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, F: FnMut(&D, &mut Tr::Key)+Clone+'static, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index cf7e793c8..105bcc340 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a prefix stream. /// @@ -25,7 +25,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, @@ -58,7 +57,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 7152f1876..3ef2f1d63 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a stream of prefixes. /// @@ -24,7 +24,6 @@ where Tr: TraceReader+Clone+'static, K: Ord+Hash+Clone+Default, V: ExchangeData+Hash+Default, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs index 2931922a8..6e1a0a5f1 100644 --- a/src/algorithms/graphs/bfs.rs +++ b/src/algorithms/graphs/bfs.rs @@ -30,7 +30,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, Tr::Cursor: crate::trace::Cursor+'static, { // initialize roots as reaching themselves at distance 0 @@ -46,4 +45,4 @@ where .concat(&nodes) .reduce(|_, s, t| t.push((s[0].0.clone(), 1))) }) -} \ No newline at end of file +} diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs index 91e02dbde..54dea878e 100644 --- a/src/algorithms/graphs/bijkstra.rs +++ b/src/algorithms/graphs/bijkstra.rs @@ -46,7 +46,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, Tr::Cursor: crate::trace::Cursor+'static, { forward diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index c94e49814..71eed8c9d 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -65,7 +65,6 @@ where R: From, L: ExchangeData, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, Tr::Cursor: crate::trace::Cursor+'static, F: Fn(&L)->u64+Clone+'static, { diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 13c3b02cd..8f5a469ef 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -92,7 +92,7 @@ where pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter) where Tr: Trace, - Tr::Batch: Batch, + Tr::Batch: Batch, { let trace = Rc::new(RefCell::new(TraceBox::new(trace))); let queues = Rc::new(RefCell::new(Vec::new())); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 09a48e248..d291fae35 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -65,7 +65,6 @@ impl Clone for Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, { fn clone(&self) -> Self { @@ -83,7 +82,6 @@ impl Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, { /// Brings an arranged collection into a nested scope. @@ -425,7 +423,6 @@ impl<'a, G: Scope, Tr> Arranged, Tr> where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, { /// Brings an arranged collection out of a nested region. @@ -462,7 +459,7 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { self.arrange_named("Arrange") @@ -479,7 +476,7 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); @@ -495,7 +492,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, ; } @@ -512,7 +509,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { // The `Arrange` operator is tasked with reacting to an advancing input @@ -547,7 +544,7 @@ where }; // Where we will deposit received updates, and from which we extract batches. - let mut batcher = >::Batcher::new(); + let mut batcher = ::Batcher::new(); // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); @@ -684,7 +681,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { self.map(|k| (k, ())) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 978d9596f..6a7aa6ba3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -145,7 +145,7 @@ where Tr::Key: ExchangeData+Hashable+std::hash::Hash, Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { let mut reader: Option> = None; @@ -252,7 +252,7 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); - let mut builder = >::Builder::new(); + let mut builder = ::Builder::new(); for (key, mut list) in to_process.drain(..) { // The prior value associated with the key. diff --git a/src/operators/arrange/writer.rs b/src/operators/arrange/writer.rs index b6c4c5710..a42924b75 100644 --- a/src/operators/arrange/writer.rs +++ b/src/operators/arrange/writer.rs @@ -23,7 +23,7 @@ pub struct TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { /// Current upper limit. upper: Antichain, @@ -37,7 +37,7 @@ impl TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { /// Creates a new `TraceWriter`. pub fn new( @@ -96,7 +96,7 @@ where pub fn seal(&mut self, upper: Antichain) { if self.upper != upper { use trace::Builder; - let builder = >::Builder::new(); + let builder = ::Builder::new(); let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum())); self.insert(batch, None); } @@ -107,7 +107,7 @@ impl Drop for TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { fn drop(&mut self) { self.seal(Antichain::new()) diff --git a/src/operators/count.rs b/src/operators/count.rs index f119d648e..6b6a6bf4e 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -74,7 +74,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn count_total_core>(&self) -> Collection { diff --git a/src/operators/join.rs b/src/operators/join.rs index ea72b32f6..67a6b21d5 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -190,7 +190,6 @@ where Tr::Key: Data+Hashable, Tr::Val: Data, Tr::R: Semigroup, - Tr::Batch: BatchReader+'static, Tr::Cursor: Cursor+'static, { fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> @@ -258,7 +257,6 @@ pub trait JoinCore where G::Time fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -311,7 +309,6 @@ pub trait JoinCore where G::Time fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -334,7 +331,6 @@ where fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -351,7 +347,6 @@ where fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -374,14 +369,12 @@ impl JoinCore for Arranged T1::Key: Ord+Debug+'static, T1::Val: Ord+Clone+Debug+'static, T1::R: Semigroup, - T1::Batch: BatchReader+'static, T1::Cursor: Cursor+'static, { fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where Tr2::Val: Ord+Clone+Debug+'static, Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::R: Semigroup, T1::R: Multiply, @@ -401,7 +394,6 @@ impl JoinCore for Arranged fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index f8d9edeb1..a2e8d58cc 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -90,7 +90,6 @@ impl Reduce for Arrang where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn reduce_named(&self, name: &str, logic: L) -> Collection @@ -179,7 +178,6 @@ impl Threshold for Arranged+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { @@ -236,7 +234,6 @@ impl Count for Arranged where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn count_core>(&self) -> Collection { @@ -282,7 +279,7 @@ pub trait ReduceCore where G::Timestam T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Abelian, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { @@ -305,7 +302,7 @@ pub trait ReduceCore where G::Timestam T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Semigroup, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; @@ -324,7 +321,7 @@ where T2::Val: Data, T2::R: Semigroup, T2: Trace+TraceReader+'static, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { @@ -337,7 +334,6 @@ impl ReduceCore for Ar where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn reduce_core(&self, name: &str, mut logic: L) -> Arranged> @@ -345,7 +341,7 @@ where T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Semigroup, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { @@ -488,7 +484,7 @@ where let mut builders = Vec::new(); for i in 0 .. capabilities.len() { buffers.push((capabilities[i].time().clone(), Vec::new())); - builders.push(>::Builder::new()); + builders.push(::Builder::new()); } // cursors for navigating input and output traces. diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index 427093a04..939211bc5 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -108,7 +108,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn threshold_semigroup(&self, mut thresh: F) -> Collection diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 14ccc0f3a..e281ca6c7 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -8,7 +8,7 @@ use lattice::Lattice; use trace::{Batch, Batcher, Builder}; /// Creates batches from unordered tuples. -pub struct MergeBatcher> { +pub struct MergeBatcher> { sorter: MergeSorter<(K, V), T, R>, lower: Antichain, frontier: Antichain, @@ -21,7 +21,7 @@ where V: Ord+Clone, T: Lattice+timely::progress::Timestamp+Ord+Clone, R: Semigroup, - B: Batch, + B: Batch, { fn new() -> Self { MergeBatcher { diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index f394d2365..d24c9eafc 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -62,7 +62,7 @@ where pub desc: Description, } -impl BatchReader for OrdValBatch +impl BatchReader for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -70,13 +70,18 @@ where R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { + type Key = K; + type Val = V; + type Time = T; + type R = R; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor() } } fn len(&self) -> usize { , O>, O> as Trie>::tuples(&self.layer) } fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdValBatch +impl Batch for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -409,13 +414,18 @@ where pub desc: Description, } -impl BatchReader for OrdKeyBatch +impl BatchReader for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { + type Key = K; + type Val = (); + type Time = T; + type R = R; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { @@ -426,10 +436,10 @@ where } } fn len(&self) -> usize { , O> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } + fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdKeyBatch +impl Batch for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 8624247e6..5acb3e7e5 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -87,7 +87,7 @@ use ::timely::order::PartialOrder; /// A spine maintains a small number of immutable collections of update tuples, merging the collections when /// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with /// other immutable collections. -pub struct Spine> { +pub struct Spine> { operator: OperatorInfo, logger: Option, phantom: ::std::marker::PhantomData<(K, V, R)>, @@ -106,7 +106,7 @@ where V: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, - B: Batch+Clone+'static, + B: Batch+Clone+'static, { type Key = K; type Val = V; @@ -114,7 +114,7 @@ where type R = R; type Batch = B; - type Cursor = CursorList>::Cursor>; + type Cursor = CursorList::Cursor>; fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { @@ -248,7 +248,7 @@ where V: Ord+Clone, T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, - B: Batch+Clone+'static, + B: Batch+Clone+'static, { fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, @@ -322,7 +322,7 @@ impl Drop for Spine where T: Lattice+Ord, R: Semigroup, - B: Batch, + B: Batch, { fn drop(&mut self) { self.drop_batches(); @@ -334,7 +334,7 @@ impl Spine where T: Lattice+Ord, R: Semigroup, - B: Batch, + B: Batch, { /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { @@ -382,7 +382,7 @@ where V: Ord+Clone, T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, - B: Batch, + B: Batch, { /// True iff there is at most one non-empty batch in `self.merging`. /// @@ -766,7 +766,7 @@ where /// /// A layer can be empty, contain a single batch, or contain a pair of batches /// that are in the process of merging into a batch for the next layer. -enum MergeState> { +enum MergeState> { /// An empty layer, containing no updates. Vacant, /// A layer containing a single batch. @@ -778,7 +778,7 @@ enum MergeState> { Double(MergeVariant), } -impl> MergeState { +impl> MergeState { /// The number of actual updates contained in the level. fn len(&self) -> usize { @@ -864,7 +864,7 @@ impl> MergeState { match (batch1, batch2) { (Some(batch1), Some(batch2)) => { assert!(batch1.upper() == batch2.lower()); - let begin_merge = >::begin_merge(&batch1, &batch2, compaction_frontier); + let begin_merge = ::begin_merge(&batch1, &batch2, compaction_frontier); MergeVariant::InProgress(batch1, batch2, begin_merge) } (None, Some(x)) => MergeVariant::Complete(Some((x, None))), @@ -876,14 +876,14 @@ impl> MergeState { } } -enum MergeVariant> { +enum MergeVariant> { /// Describes an actual in-progress merge between two non-trivial batches. - InProgress(B, B, >::Merger), + InProgress(B, B, ::Merger), /// A merge that requires no further work. May or may not represent a non-trivial batch. Complete(Option<(B, Option<(B, B)>)>), } -impl> MergeVariant { +impl> MergeVariant { /// Completes and extracts the batch, unless structurally empty. /// diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 45961491d..ad07f8a39 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -41,7 +41,6 @@ pub use self::description::Description; /// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's /// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen). pub trait TraceReader { - /// Key by which updates are indexed. type Key; /// Values associated with keys. @@ -52,7 +51,7 @@ pub trait TraceReader { type R; /// The type of an immutable collection of updates. - type Batch: BatchReader+Clone+'static; + type Batch: BatchReader+Clone+'static; /// The type used to enumerate the collections contents. type Cursor: Cursor; @@ -196,9 +195,7 @@ pub trait TraceReader { /// /// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need /// to return them. -pub trait Trace : TraceReader -where ::Batch: Batch { - +pub trait Trace: TraceReader { /// Allocates a new empty trace. fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, @@ -232,12 +229,21 @@ where ::Batch: Batch +pub trait BatchReader where Self: ::std::marker::Sized, { + /// Key by which updates are indexed. + type Key; + /// Values associated with keys. + type Val; + /// Timestamps associated with updates + type Time; + /// Associated update. + type R; + /// The type used to enumerate the batch's contents. - type Cursor: Cursor; + type Cursor: Cursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. @@ -245,39 +251,39 @@ where /// True if the batch is empty. fn is_empty(&self) -> bool { self.len() == 0 } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description; + fn description(&self) -> &Description; /// All times in the batch are greater or equal to an element of `lower`. - fn lower(&self) -> &Antichain { self.description().lower() } + fn lower(&self) -> &Antichain { self.description().lower() } /// All times in the batch are not greater or equal to any element of `upper`. - fn upper(&self) -> &Antichain { self.description().upper() } + fn upper(&self) -> &Antichain { self.description().upper() } } /// An immutable collection of updates. -pub trait Batch : BatchReader where Self: ::std::marker::Sized { +pub trait Batch: BatchReader where Self: ::std::marker::Sized { /// A type used to assemble batches from disordered updates. - type Batcher: Batcher; + type Batcher: Batcher; /// A type used to assemble batches from ordered update sequences. - type Builder: Builder; + type Builder: Builder; /// A type used to progressively merge batches. - type Merger: Merger; + type Merger: Merger; /// Initiates the merging of consecutive batches. /// /// The result of this method can be exercised to eventually produce the same result /// that a call to `self.merge(other)` would produce, but it can be done in a measured /// fashion. This can help to avoid latency spikes where a large merge needs to happen. - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { Self::Merger::new(self, other, compaction_frontier) } /// Creates an empty batch with the stated bounds. - fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { + fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { ::new().done(lower, upper, since) } } /// Functionality for collecting and batching updates. -pub trait Batcher> { +pub trait Batcher> { /// Allocates a new empty batcher. fn new() -> Self; /// Adds an unordered batch of elements to the batcher. @@ -289,7 +295,7 @@ pub trait Batcher> { } /// Functionality for building batches from ordered update sequences. -pub trait Builder> { +pub trait Builder> { /// Allocates an empty builder. fn new() -> Self; /// Allocates an empty builder with some capacity. @@ -305,7 +311,7 @@ pub trait Builder> { } /// Represents a merge in progress. -pub trait Merger> { +pub trait Merger> { /// Creates a new merger to merge the supplied batches, optionally compacting /// up to the supplied frontier. fn new(source1: &Output, source2: &Output, compaction_frontier: Option>) -> Self; @@ -331,10 +337,14 @@ pub mod rc_blanket_impls { use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - impl> BatchReader for Rc { + impl BatchReader for Rc { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; /// The type used to enumerate the batch's contents. - type Cursor = RcBatchCursor; + type Cursor = RcBatchCursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor { RcBatchCursor::new((&**self).cursor()) @@ -343,16 +353,16 @@ pub mod rc_blanket_impls { /// The number of updates in the batch. fn len(&self) -> usize { (&**self).len() } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } + fn description(&self) -> &Description { (&**self).description() } } /// Wrapper to provide cursor to nested scope. - pub struct RcBatchCursor> { + pub struct RcBatchCursor> { phantom: ::std::marker::PhantomData<(K, V, T, R)>, cursor: B::Cursor, } - impl> RcBatchCursor { + impl> RcBatchCursor { fn new(cursor: B::Cursor) -> Self { RcBatchCursor { cursor, @@ -361,7 +371,7 @@ pub mod rc_blanket_impls { } } - impl> Cursor for RcBatchCursor { + impl> Cursor for RcBatchCursor { type Storage = Rc; @@ -387,17 +397,17 @@ pub mod rc_blanket_impls { } /// An immutable collection of updates. - impl> Batch for Rc { - type Batcher = RcBatcher; - type Builder = RcBuilder; - type Merger = RcMerger; + impl Batch for Rc { + type Batcher = RcBatcher; + type Builder = RcBuilder; + type Merger = RcMerger; } /// Wrapper type for batching reference counted batches. - pub struct RcBatcher> { batcher: B::Batcher } + pub struct RcBatcher> { batcher: B::Batcher } /// Functionality for collecting and batching updates. - impl> Batcher> for RcBatcher { + impl> Batcher> for RcBatcher { fn new() -> Self { RcBatcher { batcher: >::new() } } fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } @@ -405,10 +415,10 @@ pub mod rc_blanket_impls { } /// Wrapper type for building reference counted batches. - pub struct RcBuilder> { builder: B::Builder } + pub struct RcBuilder> { builder: B::Builder } /// Functionality for building batches from ordered update sequences. - impl> Builder> for RcBuilder { + impl> Builder> for RcBuilder { fn new() -> Self { RcBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) } @@ -416,10 +426,10 @@ pub mod rc_blanket_impls { } /// Wrapper type for merging reference counted batches. - pub struct RcMerger> { merger: B::Merger } + pub struct RcMerger> { merger: B::Merger } /// Represents a merge in progress. - impl> Merger> for RcMerger { + impl> Merger> for RcMerger { fn new(source1: &Rc, source2: &Rc, compaction_frontier: Option>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } fn work(&mut self, source1: &Rc, source2: &Rc, fuel: &mut isize) { self.merger.work(source1, source2, fuel) } fn done(self) -> Rc { Rc::new(self.merger.done()) } @@ -438,10 +448,14 @@ pub mod abomonated_blanket_impls { use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - impl+Abomonation> BatchReader for Abomonated> { + impl BatchReader for Abomonated> { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; /// The type used to enumerate the batch's contents. - type Cursor = AbomonatedBatchCursor; + type Cursor = AbomonatedBatchCursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor { AbomonatedBatchCursor::new((&**self).cursor()) @@ -450,16 +464,16 @@ pub mod abomonated_blanket_impls { /// The number of updates in the batch. fn len(&self) -> usize { (&**self).len() } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } + fn description(&self) -> &Description { (&**self).description() } } /// Wrapper to provide cursor to nested scope. - pub struct AbomonatedBatchCursor> { + pub struct AbomonatedBatchCursor> { phantom: ::std::marker::PhantomData<(K, V, T, R)>, cursor: B::Cursor, } - impl> AbomonatedBatchCursor { + impl> AbomonatedBatchCursor { fn new(cursor: B::Cursor) -> Self { AbomonatedBatchCursor { cursor, @@ -468,7 +482,7 @@ pub mod abomonated_blanket_impls { } } - impl+Abomonation> Cursor for AbomonatedBatchCursor { + impl+Abomonation> Cursor for AbomonatedBatchCursor { type Storage = Abomonated>; @@ -494,17 +508,17 @@ pub mod abomonated_blanket_impls { } /// An immutable collection of updates. - impl+Abomonation> Batch for Abomonated> { - type Batcher = AbomonatedBatcher; - type Builder = AbomonatedBuilder; - type Merger = AbomonatedMerger; + impl Batch for Abomonated> { + type Batcher = AbomonatedBatcher; + type Builder = AbomonatedBuilder; + type Merger = AbomonatedMerger; } /// Wrapper type for batching reference counted batches. - pub struct AbomonatedBatcher> { batcher: B::Batcher } + pub struct AbomonatedBatcher> { batcher: B::Batcher } /// Functionality for collecting and batching updates. - impl+Abomonation> Batcher>> for AbomonatedBatcher { + impl+Abomonation> Batcher>> for AbomonatedBatcher { fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Abomonated> { @@ -517,10 +531,10 @@ pub mod abomonated_blanket_impls { } /// Wrapper type for building reference counted batches. - pub struct AbomonatedBuilder> { builder: B::Builder } + pub struct AbomonatedBuilder> { builder: B::Builder } /// Functionality for building batches from ordered update sequences. - impl+Abomonation> Builder>> for AbomonatedBuilder { + impl+Abomonation> Builder>> for AbomonatedBuilder { fn new() -> Self { AbomonatedBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) } @@ -533,10 +547,10 @@ pub mod abomonated_blanket_impls { } /// Wrapper type for merging reference counted batches. - pub struct AbomonatedMerger> { merger: B::Merger } + pub struct AbomonatedMerger> { merger: B::Merger } /// Represents a merge in progress. - impl+Abomonation> Merger>> for AbomonatedMerger { + impl+Abomonation> Merger>> for AbomonatedMerger { fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: Option>) -> Self { AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index c1d28fd92..410b8d642 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -47,7 +47,7 @@ where type Time = TInner; type R = Tr::R; - type Batch = BatchEnter; + type Batch = BatchEnter; type Cursor = CursorEnter; fn map_batches(&self, mut f: F) { @@ -113,42 +113,37 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchEnter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchEnter { batch: B, description: Description, } -impl Clone for BatchEnter { - fn clone(&self) -> Self { - BatchEnter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - description: self.description.clone(), - } - } -} - -impl BatchReader for BatchEnter +impl BatchReader for BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { - type Cursor = BatchCursorEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { BatchCursorEnter::new(self.batch.cursor()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.description } + fn description(&self) -> &Description { &self.description } } -impl BatchEnter +impl BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { /// Makes a new batch wrapper pub fn make_from(batch: B) -> Self { @@ -157,7 +152,6 @@ where let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect(); BatchEnter { - phantom: ::std::marker::PhantomData, batch: batch, description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)) } @@ -213,12 +207,12 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorEnter, TInner> { +pub struct BatchCursorEnter, TInner> { phantom: ::std::marker::PhantomData<(K, V, R, TInner)>, cursor: B::Cursor, } -impl, TInner> BatchCursorEnter { +impl, TInner> BatchCursorEnter { fn new(cursor: B::Cursor) -> Self { BatchCursorEnter { phantom: ::std::marker::PhantomData, @@ -227,12 +221,12 @@ impl, TInner> BatchCursorEnter> Cursor for BatchCursorEnter +impl> Cursor for BatchCursorEnter where T: Timestamp, TInner: Refines+Lattice, { - type Storage = BatchEnter; + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index 9562fc90c..e95f885f1 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -62,7 +62,7 @@ where type Time = TInner; type R = Tr::R; - type Batch = BatchEnter; + type Batch = BatchEnter; type Cursor = CursorEnter; fn map_batches(&self, mut f: F2) { @@ -131,32 +131,26 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchEnter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchEnter { batch: B, description: Description, logic: F, } -impl Clone for BatchEnter { - fn clone(&self) -> Self { - BatchEnter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - description: self.description.clone(), - logic: self.logic.clone(), - } - } -} - -impl BatchReader for BatchEnter +impl BatchReader for BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, - F: FnMut(&K, &V, &T)->TInner+Clone, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, + F: FnMut(&B::Key, &B::Val, &B::Time)->TInner+Clone, { - type Cursor = BatchCursorEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { BatchCursorEnter::new(self.batch.cursor(), self.logic.clone()) @@ -165,11 +159,11 @@ where fn description(&self) -> &Description { &self.description } } -impl BatchEnter +impl BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { /// Makes a new batch wrapper pub fn make_from(batch: B, logic: F) -> Self { @@ -178,7 +172,6 @@ where let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect(); BatchEnter { - phantom: ::std::marker::PhantomData, batch, description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)), logic, @@ -241,13 +234,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorEnter, TInner, F> { +pub struct BatchCursorEnter, TInner, F> { phantom: ::std::marker::PhantomData<(K, V, R, TInner)>, cursor: B::Cursor, logic: F, } -impl, TInner, F> BatchCursorEnter { +impl, TInner, F> BatchCursorEnter { fn new(cursor: B::Cursor, logic: F) -> Self { BatchCursorEnter { phantom: ::std::marker::PhantomData, @@ -257,13 +250,13 @@ impl, TInner, F> BatchCursorEnter, F> Cursor for BatchCursorEnter +impl, F> Cursor for BatchCursorEnter where T: Timestamp, TInner: Refines+Lattice, F: FnMut(&K, &V, &T)->TInner, { - type Storage = BatchEnter; + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index 0f1a531ab..b61994bc1 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -40,7 +40,7 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFilter; + type Batch = BatchFilter; type Cursor = CursorFilter; fn map_batches(&self, mut f: F2) { @@ -76,46 +76,36 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFilter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchFilter { batch: B, logic: F, } -impl Clone for BatchFilter { - fn clone(&self) -> Self { - BatchFilter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - logic: self.logic.clone(), - } - } -} - -impl BatchReader for BatchFilter +impl BatchReader for BatchFilter where - B: BatchReader, - T: Timestamp, - F: FnMut(&K, &V)->bool+Clone+'static + B: BatchReader, + B::Time: Timestamp, + F: FnMut(&B::Key, &B::Val)->bool+Clone+'static { - type Cursor = BatchCursorFilter; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFilter; fn cursor(&self) -> Self::Cursor { BatchCursorFilter::new(self.batch.cursor(), self.logic.clone()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.batch.description() } + fn description(&self) -> &Description { &self.batch.description() } } -impl BatchFilter -where - B: BatchReader, - T: Timestamp, -{ +impl BatchFilter { /// Makes a new batch wrapper pub fn make_from(batch: B, logic: F) -> Self { BatchFilter { - phantom: ::std::marker::PhantomData, batch, logic, } @@ -175,13 +165,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFilter, F> { +pub struct BatchCursorFilter, F> { phantom: ::std::marker::PhantomData<(K, V, R)>, cursor: B::Cursor, logic: F, } -impl, F> BatchCursorFilter { +impl, F> BatchCursorFilter { fn new(cursor: B::Cursor, logic: F) -> Self { BatchCursorFilter { phantom: ::std::marker::PhantomData, @@ -191,12 +181,12 @@ impl, F> BatchCursorFilter, F> Cursor for BatchCursorFilter +impl, F> Cursor for BatchCursorFilter where T: Timestamp, F: FnMut(&K, &V)->bool+'static, { - type Storage = BatchFilter; + type Storage = BatchFilter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index fa24b91d7..706d6a1a8 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -41,7 +41,6 @@ where T::Key: 'static, T::Val: 'static, T::R: 'static, - T::Batch: BatchReader, T::Cursor: Cursor, F: Fn(&G::Timestamp)->Option+'static, { @@ -93,7 +92,7 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFreeze; + type Batch = BatchFreeze; type Cursor = CursorFreeze; fn map_batches(&self, mut f: F2) { @@ -137,47 +136,39 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFreeze { - phantom: ::std::marker::PhantomData<(K, V, R, T)>, +pub struct BatchFreeze { batch: B, func: Rc, } -impl Clone for BatchFreeze { +impl Clone for BatchFreeze { fn clone(&self) -> Self { BatchFreeze { - phantom: ::std::marker::PhantomData, batch: self.batch.clone(), func: self.func.clone(), } } } -impl BatchReader for BatchFreeze -where - B: BatchReader, - T: Clone, - F: Fn(&T)->Option, -{ - type Cursor = BatchCursorFreeze; +impl Option> BatchReader for BatchFreeze { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFreeze; fn cursor(&self) -> Self::Cursor { BatchCursorFreeze::new(self.batch.cursor(), self.func.clone()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { self.batch.description() } + fn description(&self) -> &Description { self.batch.description() } } -impl BatchFreeze -where - B: BatchReader, - T: Clone, - F: Fn(&T)->Option -{ +impl Option> BatchFreeze { /// Makes a new batch wrapper pub fn make_from(batch: B, func: Rc) -> Self { BatchFreeze { - phantom: ::std::marker::PhantomData, batch: batch, func: func, } @@ -236,13 +227,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFreeze, F> { +pub struct BatchCursorFreeze, F> { phantom: ::std::marker::PhantomData<(K, V, R, T)>, cursor: B::Cursor, func: Rc, } -impl, F> BatchCursorFreeze { +impl, F> BatchCursorFreeze { fn new(cursor: B::Cursor, func: Rc) -> Self { BatchCursorFreeze { phantom: ::std::marker::PhantomData, @@ -252,12 +243,12 @@ impl, F> BatchCursorFreeze, F> Cursor for BatchCursorFreeze +impl, F> Cursor for BatchCursorFreeze where F: Fn(&T)->Option, { - type Storage = BatchFreeze; + type Storage = BatchFreeze; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 43ed2096b..d54c79beb 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -47,7 +47,7 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFrontier; + type Batch = BatchFrontier; type Cursor = CursorFrontier; fn map_batches(&self, mut f: F) { @@ -83,45 +83,37 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFrontier { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchFrontier { batch: B, - frontier: Antichain, + frontier: Antichain, } -impl Clone for BatchFrontier { - fn clone(&self) -> Self { - BatchFrontier { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - frontier: self.frontier.to_owned(), - } - } -} - -impl BatchReader for BatchFrontier +impl BatchReader for BatchFrontier where - B: BatchReader, - T: Timestamp+Lattice, + B::Time: Timestamp + Lattice { - type Cursor = BatchCursorFrontier; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFrontier; fn cursor(&self) -> Self::Cursor { BatchCursorFrontier::new(self.batch.cursor(), self.frontier.borrow()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.batch.description() } + fn description(&self) -> &Description { &self.batch.description() } } -impl BatchFrontier +impl BatchFrontier where - B: BatchReader, - T: Timestamp+Lattice, + B::Time: Timestamp+Lattice { /// Makes a new batch wrapper - pub fn make_from(batch: B, frontier: AntichainRef) -> Self { + pub fn make_from(batch: B, frontier: AntichainRef) -> Self { BatchFrontier { - phantom: ::std::marker::PhantomData, batch, frontier: frontier.to_owned(), } @@ -182,13 +174,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFrontier> { +pub struct BatchCursorFrontier> { phantom: ::std::marker::PhantomData<(K, V, R)>, cursor: B::Cursor, frontier: Antichain, } -impl> BatchCursorFrontier { +impl> BatchCursorFrontier { fn new(cursor: B::Cursor, frontier: AntichainRef) -> Self { BatchCursorFrontier { phantom: ::std::marker::PhantomData, @@ -198,11 +190,11 @@ impl> BatchCursorFrontier> Cursor for BatchCursorFrontier +impl> Cursor for BatchCursorFrontier where T: Timestamp+Lattice, { - type Storage = BatchFrontier; + type Storage = BatchFrontier; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/tests/trace.rs b/tests/trace.rs index 1c22f4665..db4d5a90e 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -19,7 +19,7 @@ fn get_trace() -> Spine::Batch as Batch>::Batcher::new(); + let mut batcher = <::Batch as Batch>::Batcher::new(); batcher.push_batch(&mut vec![ ((1, 2), 0, 1),