diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index bae14c7b3..bb75f26a9 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Reports a number of extensions to a stream of prefixes. /// @@ -23,7 +23,6 @@ where G::Timestamp: Lattice, Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 8f6f4d0cf..482bbb570 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Monoid, Semigroup}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// A binary equijoin that responds to updates on only its first input. @@ -81,7 +81,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, @@ -137,7 +136,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+ExchangeData, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 671e24921..081f17bc4 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a stream of prefixes. /// @@ -32,7 +32,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+ExchangeData, F: FnMut(&D, &mut Tr::Key)+Clone+'static, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index cf7e793c8..105bcc340 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a prefix stream. /// @@ -25,7 +25,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, @@ -58,7 +57,6 @@ where Tr: TraceReader+Clone+'static, Tr::Key: Ord+Hashable+Default, Tr::Val: Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->Tr::Key+Clone+'static, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 7152f1876..3ef2f1d63 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::trace::{Cursor, TraceReader}; /// Proposes extensions to a stream of prefixes. /// @@ -24,7 +24,6 @@ where Tr: TraceReader+Clone+'static, K: Ord+Hash+Clone+Default, V: ExchangeData+Hash+Default, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, Tr::R: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs index 2931922a8..6e1a0a5f1 100644 --- a/src/algorithms/graphs/bfs.rs +++ b/src/algorithms/graphs/bfs.rs @@ -30,7 +30,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, Tr::Cursor: crate::trace::Cursor+'static, { // initialize roots as reaching themselves at distance 0 @@ -46,4 +45,4 @@ where .concat(&nodes) .reduce(|_, s, t| t.push((s[0].0.clone(), 1))) }) -} \ No newline at end of file +} diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs index 91e02dbde..54dea878e 100644 --- a/src/algorithms/graphs/bijkstra.rs +++ b/src/algorithms/graphs/bijkstra.rs @@ -46,7 +46,6 @@ where G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, Tr::Cursor: crate::trace::Cursor+'static, { forward diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index c94e49814..71eed8c9d 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -65,7 +65,6 @@ where R: From, L: ExchangeData, Tr: TraceReader+Clone+'static, - Tr::Batch: crate::trace::BatchReader+'static, Tr::Cursor: crate::trace::Cursor+'static, F: Fn(&L)->u64+Clone+'static, { diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 13c3b02cd..8f5a469ef 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -92,7 +92,7 @@ where pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter) where Tr: Trace, - Tr::Batch: Batch, + Tr::Batch: Batch, { let trace = Rc::new(RefCell::new(TraceBox::new(trace))); let queues = Rc::new(RefCell::new(Vec::new())); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 09a48e248..d291fae35 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -65,7 +65,6 @@ impl Clone for Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, { fn clone(&self) -> Self { @@ -83,7 +82,6 @@ impl Arranged where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, { /// Brings an arranged collection into a nested scope. @@ -425,7 +423,6 @@ impl<'a, G: Scope, Tr> Arranged, Tr> where G::Timestamp: Lattice+Ord, Tr: TraceReader + Clone, - Tr::Batch: BatchReader, Tr::Cursor: Cursor, { /// Brings an arranged collection out of a nested region. @@ -462,7 +459,7 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { self.arrange_named("Arrange") @@ -479,7 +476,7 @@ where V: ExchangeData, R: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); @@ -495,7 +492,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, ; } @@ -512,7 +509,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { // The `Arrange` operator is tasked with reacting to an advancing input @@ -547,7 +544,7 @@ where }; // Where we will deposit received updates, and from which we extract batches. - let mut batcher = >::Batcher::new(); + let mut batcher = ::Batcher::new(); // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); @@ -684,7 +681,7 @@ where where P: ParallelizationContract, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { self.map(|k| (k, ())) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 978d9596f..6a7aa6ba3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -145,7 +145,7 @@ where Tr::Key: ExchangeData+Hashable+std::hash::Hash, Tr::Val: ExchangeData, Tr: Trace+TraceReader+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, Tr::Cursor: Cursor, { let mut reader: Option> = None; @@ -252,7 +252,7 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); - let mut builder = >::Builder::new(); + let mut builder = ::Builder::new(); for (key, mut list) in to_process.drain(..) { // The prior value associated with the key. diff --git a/src/operators/arrange/writer.rs b/src/operators/arrange/writer.rs index b6c4c5710..a42924b75 100644 --- a/src/operators/arrange/writer.rs +++ b/src/operators/arrange/writer.rs @@ -23,7 +23,7 @@ pub struct TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { /// Current upper limit. upper: Antichain, @@ -37,7 +37,7 @@ impl TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { /// Creates a new `TraceWriter`. pub fn new( @@ -96,7 +96,7 @@ where pub fn seal(&mut self, upper: Antichain) { if self.upper != upper { use trace::Builder; - let builder = >::Builder::new(); + let builder = ::Builder::new(); let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum())); self.insert(batch, None); } @@ -107,7 +107,7 @@ impl Drop for TraceWriter where Tr: Trace, Tr::Time: Lattice+Timestamp+Ord+Clone+std::fmt::Debug+'static, - Tr::Batch: Batch, + Tr::Batch: Batch, { fn drop(&mut self) { self.seal(Antichain::new()) diff --git a/src/operators/count.rs b/src/operators/count.rs index f119d648e..6b6a6bf4e 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -74,7 +74,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn count_total_core>(&self) -> Collection { diff --git a/src/operators/join.rs b/src/operators/join.rs index ea72b32f6..67a6b21d5 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -190,7 +190,6 @@ where Tr::Key: Data+Hashable, Tr::Val: Data, Tr::R: Semigroup, - Tr::Batch: BatchReader+'static, Tr::Cursor: Cursor+'static, { fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> @@ -258,7 +257,6 @@ pub trait JoinCore where G::Time fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -311,7 +309,6 @@ pub trait JoinCore where G::Time fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -334,7 +331,6 @@ where fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -351,7 +347,6 @@ where fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, @@ -374,14 +369,12 @@ impl JoinCore for Arranged T1::Key: Ord+Debug+'static, T1::Val: Ord+Clone+Debug+'static, T1::R: Semigroup, - T1::Batch: BatchReader+'static, T1::Cursor: Cursor+'static, { fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where Tr2::Val: Ord+Clone+Debug+'static, Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::R: Semigroup, T1::R: Multiply, @@ -401,7 +394,6 @@ impl JoinCore for Arranged fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Batch: BatchReader+'static, Tr2::Cursor: Cursor+'static, Tr2::Val: Ord+Clone+Debug+'static, Tr2::R: Semigroup, diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index f8d9edeb1..a2e8d58cc 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -90,7 +90,6 @@ impl Reduce for Arrang where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn reduce_named(&self, name: &str, logic: L) -> Collection @@ -179,7 +178,6 @@ impl Threshold for Arranged+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { @@ -236,7 +234,6 @@ impl Count for Arranged where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn count_core>(&self) -> Collection { @@ -282,7 +279,7 @@ pub trait ReduceCore where G::Timestam T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Abelian, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { @@ -305,7 +302,7 @@ pub trait ReduceCore where G::Timestam T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Semigroup, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; @@ -324,7 +321,7 @@ where T2::Val: Data, T2::R: Semigroup, T2: Trace+TraceReader+'static, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { @@ -337,7 +334,6 @@ impl ReduceCore for Ar where G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn reduce_core(&self, name: &str, mut logic: L) -> Arranged> @@ -345,7 +341,7 @@ where T2: Trace+TraceReader+'static, T2::Val: Data, T2::R: Semigroup, - T2::Batch: Batch, + T2::Batch: Batch, T2::Cursor: Cursor, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { @@ -488,7 +484,7 @@ where let mut builders = Vec::new(); for i in 0 .. capabilities.len() { buffers.push((capabilities[i].time().clone(), Vec::new())); - builders.push(>::Builder::new()); + builders.push(::Builder::new()); } // cursors for navigating input and output traces. diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index 427093a04..939211bc5 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -108,7 +108,6 @@ where T1: TraceReader+Clone+'static, T1::Key: ExchangeData, T1::R: ExchangeData+Semigroup, - T1::Batch: BatchReader, T1::Cursor: Cursor, { fn threshold_semigroup(&self, mut thresh: F) -> Collection diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 14ccc0f3a..e281ca6c7 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -8,7 +8,7 @@ use lattice::Lattice; use trace::{Batch, Batcher, Builder}; /// Creates batches from unordered tuples. -pub struct MergeBatcher> { +pub struct MergeBatcher> { sorter: MergeSorter<(K, V), T, R>, lower: Antichain, frontier: Antichain, @@ -21,7 +21,7 @@ where V: Ord+Clone, T: Lattice+timely::progress::Timestamp+Ord+Clone, R: Semigroup, - B: Batch, + B: Batch, { fn new() -> Self { MergeBatcher { diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index f394d2365..d24c9eafc 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -62,7 +62,7 @@ where pub desc: Description, } -impl BatchReader for OrdValBatch +impl BatchReader for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -70,13 +70,18 @@ where R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { + type Key = K; + type Val = V; + type Time = T; + type R = R; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor() } } fn len(&self) -> usize { , O>, O> as Trie>::tuples(&self.layer) } fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdValBatch +impl Batch for OrdValBatch where K: Ord+Clone+'static, V: Ord+Clone+'static, @@ -409,13 +414,18 @@ where pub desc: Description, } -impl BatchReader for OrdKeyBatch +impl BatchReader for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug { + type Key = K; + type Val = (); + type Time = T; + type R = R; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { @@ -426,10 +436,10 @@ where } } fn len(&self) -> usize { , O> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } + fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdKeyBatch +impl Batch for OrdKeyBatch where K: Ord+Clone+'static, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 8624247e6..5acb3e7e5 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -87,7 +87,7 @@ use ::timely::order::PartialOrder; /// A spine maintains a small number of immutable collections of update tuples, merging the collections when /// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with /// other immutable collections. -pub struct Spine> { +pub struct Spine> { operator: OperatorInfo, logger: Option, phantom: ::std::marker::PhantomData<(K, V, R)>, @@ -106,7 +106,7 @@ where V: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, - B: Batch+Clone+'static, + B: Batch+Clone+'static, { type Key = K; type Val = V; @@ -114,7 +114,7 @@ where type R = R; type Batch = B; - type Cursor = CursorList>::Cursor>; + type Cursor = CursorList::Cursor>; fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { @@ -248,7 +248,7 @@ where V: Ord+Clone, T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, - B: Batch+Clone+'static, + B: Batch+Clone+'static, { fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, @@ -322,7 +322,7 @@ impl Drop for Spine where T: Lattice+Ord, R: Semigroup, - B: Batch, + B: Batch, { fn drop(&mut self) { self.drop_batches(); @@ -334,7 +334,7 @@ impl Spine where T: Lattice+Ord, R: Semigroup, - B: Batch, + B: Batch, { /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { @@ -382,7 +382,7 @@ where V: Ord+Clone, T: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, R: Semigroup, - B: Batch, + B: Batch, { /// True iff there is at most one non-empty batch in `self.merging`. /// @@ -766,7 +766,7 @@ where /// /// A layer can be empty, contain a single batch, or contain a pair of batches /// that are in the process of merging into a batch for the next layer. -enum MergeState> { +enum MergeState> { /// An empty layer, containing no updates. Vacant, /// A layer containing a single batch. @@ -778,7 +778,7 @@ enum MergeState> { Double(MergeVariant), } -impl> MergeState { +impl> MergeState { /// The number of actual updates contained in the level. fn len(&self) -> usize { @@ -864,7 +864,7 @@ impl> MergeState { match (batch1, batch2) { (Some(batch1), Some(batch2)) => { assert!(batch1.upper() == batch2.lower()); - let begin_merge = >::begin_merge(&batch1, &batch2, compaction_frontier); + let begin_merge = ::begin_merge(&batch1, &batch2, compaction_frontier); MergeVariant::InProgress(batch1, batch2, begin_merge) } (None, Some(x)) => MergeVariant::Complete(Some((x, None))), @@ -876,14 +876,14 @@ impl> MergeState { } } -enum MergeVariant> { +enum MergeVariant> { /// Describes an actual in-progress merge between two non-trivial batches. - InProgress(B, B, >::Merger), + InProgress(B, B, ::Merger), /// A merge that requires no further work. May or may not represent a non-trivial batch. Complete(Option<(B, Option<(B, B)>)>), } -impl> MergeVariant { +impl> MergeVariant { /// Completes and extracts the batch, unless structurally empty. /// diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 45961491d..ad07f8a39 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -41,7 +41,6 @@ pub use self::description::Description; /// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's /// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen). pub trait TraceReader { - /// Key by which updates are indexed. type Key; /// Values associated with keys. @@ -52,7 +51,7 @@ pub trait TraceReader { type R; /// The type of an immutable collection of updates. - type Batch: BatchReader+Clone+'static; + type Batch: BatchReader+Clone+'static; /// The type used to enumerate the collections contents. type Cursor: Cursor; @@ -196,9 +195,7 @@ pub trait TraceReader { /// /// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need /// to return them. -pub trait Trace : TraceReader -where ::Batch: Batch { - +pub trait Trace: TraceReader { /// Allocates a new empty trace. fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, @@ -232,12 +229,21 @@ where ::Batch: Batch +pub trait BatchReader where Self: ::std::marker::Sized, { + /// Key by which updates are indexed. + type Key; + /// Values associated with keys. + type Val; + /// Timestamps associated with updates + type Time; + /// Associated update. + type R; + /// The type used to enumerate the batch's contents. - type Cursor: Cursor; + type Cursor: Cursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. @@ -245,39 +251,39 @@ where /// True if the batch is empty. fn is_empty(&self) -> bool { self.len() == 0 } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description; + fn description(&self) -> &Description; /// All times in the batch are greater or equal to an element of `lower`. - fn lower(&self) -> &Antichain { self.description().lower() } + fn lower(&self) -> &Antichain { self.description().lower() } /// All times in the batch are not greater or equal to any element of `upper`. - fn upper(&self) -> &Antichain { self.description().upper() } + fn upper(&self) -> &Antichain { self.description().upper() } } /// An immutable collection of updates. -pub trait Batch : BatchReader where Self: ::std::marker::Sized { +pub trait Batch: BatchReader where Self: ::std::marker::Sized { /// A type used to assemble batches from disordered updates. - type Batcher: Batcher; + type Batcher: Batcher; /// A type used to assemble batches from ordered update sequences. - type Builder: Builder; + type Builder: Builder; /// A type used to progressively merge batches. - type Merger: Merger; + type Merger: Merger; /// Initiates the merging of consecutive batches. /// /// The result of this method can be exercised to eventually produce the same result /// that a call to `self.merge(other)` would produce, but it can be done in a measured /// fashion. This can help to avoid latency spikes where a large merge needs to happen. - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { Self::Merger::new(self, other, compaction_frontier) } /// Creates an empty batch with the stated bounds. - fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { + fn empty(lower: Antichain, upper: Antichain, since: Antichain) -> Self { ::new().done(lower, upper, since) } } /// Functionality for collecting and batching updates. -pub trait Batcher> { +pub trait Batcher> { /// Allocates a new empty batcher. fn new() -> Self; /// Adds an unordered batch of elements to the batcher. @@ -289,7 +295,7 @@ pub trait Batcher> { } /// Functionality for building batches from ordered update sequences. -pub trait Builder> { +pub trait Builder> { /// Allocates an empty builder. fn new() -> Self; /// Allocates an empty builder with some capacity. @@ -305,7 +311,7 @@ pub trait Builder> { } /// Represents a merge in progress. -pub trait Merger> { +pub trait Merger> { /// Creates a new merger to merge the supplied batches, optionally compacting /// up to the supplied frontier. fn new(source1: &Output, source2: &Output, compaction_frontier: Option>) -> Self; @@ -331,10 +337,14 @@ pub mod rc_blanket_impls { use timely::progress::{Antichain, frontier::AntichainRef}; use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - impl> BatchReader for Rc { + impl BatchReader for Rc { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; /// The type used to enumerate the batch's contents. - type Cursor = RcBatchCursor; + type Cursor = RcBatchCursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor { RcBatchCursor::new((&**self).cursor()) @@ -343,16 +353,16 @@ pub mod rc_blanket_impls { /// The number of updates in the batch. fn len(&self) -> usize { (&**self).len() } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } + fn description(&self) -> &Description { (&**self).description() } } /// Wrapper to provide cursor to nested scope. - pub struct RcBatchCursor> { + pub struct RcBatchCursor> { phantom: ::std::marker::PhantomData<(K, V, T, R)>, cursor: B::Cursor, } - impl> RcBatchCursor { + impl> RcBatchCursor { fn new(cursor: B::Cursor) -> Self { RcBatchCursor { cursor, @@ -361,7 +371,7 @@ pub mod rc_blanket_impls { } } - impl> Cursor for RcBatchCursor { + impl> Cursor for RcBatchCursor { type Storage = Rc; @@ -387,17 +397,17 @@ pub mod rc_blanket_impls { } /// An immutable collection of updates. - impl> Batch for Rc { - type Batcher = RcBatcher; - type Builder = RcBuilder; - type Merger = RcMerger; + impl Batch for Rc { + type Batcher = RcBatcher; + type Builder = RcBuilder; + type Merger = RcMerger; } /// Wrapper type for batching reference counted batches. - pub struct RcBatcher> { batcher: B::Batcher } + pub struct RcBatcher> { batcher: B::Batcher } /// Functionality for collecting and batching updates. - impl> Batcher> for RcBatcher { + impl> Batcher> for RcBatcher { fn new() -> Self { RcBatcher { batcher: >::new() } } fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } @@ -405,10 +415,10 @@ pub mod rc_blanket_impls { } /// Wrapper type for building reference counted batches. - pub struct RcBuilder> { builder: B::Builder } + pub struct RcBuilder> { builder: B::Builder } /// Functionality for building batches from ordered update sequences. - impl> Builder> for RcBuilder { + impl> Builder> for RcBuilder { fn new() -> Self { RcBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) } @@ -416,10 +426,10 @@ pub mod rc_blanket_impls { } /// Wrapper type for merging reference counted batches. - pub struct RcMerger> { merger: B::Merger } + pub struct RcMerger> { merger: B::Merger } /// Represents a merge in progress. - impl> Merger> for RcMerger { + impl> Merger> for RcMerger { fn new(source1: &Rc, source2: &Rc, compaction_frontier: Option>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } fn work(&mut self, source1: &Rc, source2: &Rc, fuel: &mut isize) { self.merger.work(source1, source2, fuel) } fn done(self) -> Rc { Rc::new(self.merger.done()) } @@ -438,10 +448,14 @@ pub mod abomonated_blanket_impls { use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description}; - impl+Abomonation> BatchReader for Abomonated> { + impl BatchReader for Abomonated> { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; /// The type used to enumerate the batch's contents. - type Cursor = AbomonatedBatchCursor; + type Cursor = AbomonatedBatchCursor; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor { AbomonatedBatchCursor::new((&**self).cursor()) @@ -450,16 +464,16 @@ pub mod abomonated_blanket_impls { /// The number of updates in the batch. fn len(&self) -> usize { (&**self).len() } /// Describes the times of the updates in the batch. - fn description(&self) -> &Description { (&**self).description() } + fn description(&self) -> &Description { (&**self).description() } } /// Wrapper to provide cursor to nested scope. - pub struct AbomonatedBatchCursor> { + pub struct AbomonatedBatchCursor> { phantom: ::std::marker::PhantomData<(K, V, T, R)>, cursor: B::Cursor, } - impl> AbomonatedBatchCursor { + impl> AbomonatedBatchCursor { fn new(cursor: B::Cursor) -> Self { AbomonatedBatchCursor { cursor, @@ -468,7 +482,7 @@ pub mod abomonated_blanket_impls { } } - impl+Abomonation> Cursor for AbomonatedBatchCursor { + impl+Abomonation> Cursor for AbomonatedBatchCursor { type Storage = Abomonated>; @@ -494,17 +508,17 @@ pub mod abomonated_blanket_impls { } /// An immutable collection of updates. - impl+Abomonation> Batch for Abomonated> { - type Batcher = AbomonatedBatcher; - type Builder = AbomonatedBuilder; - type Merger = AbomonatedMerger; + impl Batch for Abomonated> { + type Batcher = AbomonatedBatcher; + type Builder = AbomonatedBuilder; + type Merger = AbomonatedMerger; } /// Wrapper type for batching reference counted batches. - pub struct AbomonatedBatcher> { batcher: B::Batcher } + pub struct AbomonatedBatcher> { batcher: B::Batcher } /// Functionality for collecting and batching updates. - impl+Abomonation> Batcher>> for AbomonatedBatcher { + impl+Abomonation> Batcher>> for AbomonatedBatcher { fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Abomonated> { @@ -517,10 +531,10 @@ pub mod abomonated_blanket_impls { } /// Wrapper type for building reference counted batches. - pub struct AbomonatedBuilder> { builder: B::Builder } + pub struct AbomonatedBuilder> { builder: B::Builder } /// Functionality for building batches from ordered update sequences. - impl+Abomonation> Builder>> for AbomonatedBuilder { + impl+Abomonation> Builder>> for AbomonatedBuilder { fn new() -> Self { AbomonatedBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) } @@ -533,10 +547,10 @@ pub mod abomonated_blanket_impls { } /// Wrapper type for merging reference counted batches. - pub struct AbomonatedMerger> { merger: B::Merger } + pub struct AbomonatedMerger> { merger: B::Merger } /// Represents a merge in progress. - impl+Abomonation> Merger>> for AbomonatedMerger { + impl+Abomonation> Merger>> for AbomonatedMerger { fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: Option>) -> Self { AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index c1d28fd92..410b8d642 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -47,7 +47,7 @@ where type Time = TInner; type R = Tr::R; - type Batch = BatchEnter; + type Batch = BatchEnter; type Cursor = CursorEnter; fn map_batches(&self, mut f: F) { @@ -113,42 +113,37 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchEnter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchEnter { batch: B, description: Description, } -impl Clone for BatchEnter { - fn clone(&self) -> Self { - BatchEnter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - description: self.description.clone(), - } - } -} - -impl BatchReader for BatchEnter +impl BatchReader for BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { - type Cursor = BatchCursorEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { BatchCursorEnter::new(self.batch.cursor()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.description } + fn description(&self) -> &Description { &self.description } } -impl BatchEnter +impl BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { /// Makes a new batch wrapper pub fn make_from(batch: B) -> Self { @@ -157,7 +152,6 @@ where let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect(); BatchEnter { - phantom: ::std::marker::PhantomData, batch: batch, description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)) } @@ -213,12 +207,12 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorEnter, TInner> { +pub struct BatchCursorEnter, TInner> { phantom: ::std::marker::PhantomData<(K, V, R, TInner)>, cursor: B::Cursor, } -impl, TInner> BatchCursorEnter { +impl, TInner> BatchCursorEnter { fn new(cursor: B::Cursor) -> Self { BatchCursorEnter { phantom: ::std::marker::PhantomData, @@ -227,12 +221,12 @@ impl, TInner> BatchCursorEnter> Cursor for BatchCursorEnter +impl> Cursor for BatchCursorEnter where T: Timestamp, TInner: Refines+Lattice, { - type Storage = BatchEnter; + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index 9562fc90c..e95f885f1 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -62,7 +62,7 @@ where type Time = TInner; type R = Tr::R; - type Batch = BatchEnter; + type Batch = BatchEnter; type Cursor = CursorEnter; fn map_batches(&self, mut f: F2) { @@ -131,32 +131,26 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchEnter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchEnter { batch: B, description: Description, logic: F, } -impl Clone for BatchEnter { - fn clone(&self) -> Self { - BatchEnter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - description: self.description.clone(), - logic: self.logic.clone(), - } - } -} - -impl BatchReader for BatchEnter +impl BatchReader for BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, - F: FnMut(&K, &V, &T)->TInner+Clone, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, + F: FnMut(&B::Key, &B::Val, &B::Time)->TInner+Clone, { - type Cursor = BatchCursorEnter; + type Key = B::Key; + type Val = B::Val; + type Time = TInner; + type R = B::R; + + type Cursor = BatchCursorEnter; fn cursor(&self) -> Self::Cursor { BatchCursorEnter::new(self.batch.cursor(), self.logic.clone()) @@ -165,11 +159,11 @@ where fn description(&self) -> &Description { &self.description } } -impl BatchEnter +impl BatchEnter where - B: BatchReader, - T: Timestamp, - TInner: Refines+Lattice, + B: BatchReader, + B::Time: Timestamp, + TInner: Refines+Lattice, { /// Makes a new batch wrapper pub fn make_from(batch: B, logic: F) -> Self { @@ -178,7 +172,6 @@ where let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect(); BatchEnter { - phantom: ::std::marker::PhantomData, batch, description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)), logic, @@ -241,13 +234,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorEnter, TInner, F> { +pub struct BatchCursorEnter, TInner, F> { phantom: ::std::marker::PhantomData<(K, V, R, TInner)>, cursor: B::Cursor, logic: F, } -impl, TInner, F> BatchCursorEnter { +impl, TInner, F> BatchCursorEnter { fn new(cursor: B::Cursor, logic: F) -> Self { BatchCursorEnter { phantom: ::std::marker::PhantomData, @@ -257,13 +250,13 @@ impl, TInner, F> BatchCursorEnter, F> Cursor for BatchCursorEnter +impl, F> Cursor for BatchCursorEnter where T: Timestamp, TInner: Refines+Lattice, F: FnMut(&K, &V, &T)->TInner, { - type Storage = BatchEnter; + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index 0f1a531ab..b61994bc1 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -40,7 +40,7 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFilter; + type Batch = BatchFilter; type Cursor = CursorFilter; fn map_batches(&self, mut f: F2) { @@ -76,46 +76,36 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFilter { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchFilter { batch: B, logic: F, } -impl Clone for BatchFilter { - fn clone(&self) -> Self { - BatchFilter { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - logic: self.logic.clone(), - } - } -} - -impl BatchReader for BatchFilter +impl BatchReader for BatchFilter where - B: BatchReader, - T: Timestamp, - F: FnMut(&K, &V)->bool+Clone+'static + B: BatchReader, + B::Time: Timestamp, + F: FnMut(&B::Key, &B::Val)->bool+Clone+'static { - type Cursor = BatchCursorFilter; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFilter; fn cursor(&self) -> Self::Cursor { BatchCursorFilter::new(self.batch.cursor(), self.logic.clone()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.batch.description() } + fn description(&self) -> &Description { &self.batch.description() } } -impl BatchFilter -where - B: BatchReader, - T: Timestamp, -{ +impl BatchFilter { /// Makes a new batch wrapper pub fn make_from(batch: B, logic: F) -> Self { BatchFilter { - phantom: ::std::marker::PhantomData, batch, logic, } @@ -175,13 +165,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFilter, F> { +pub struct BatchCursorFilter, F> { phantom: ::std::marker::PhantomData<(K, V, R)>, cursor: B::Cursor, logic: F, } -impl, F> BatchCursorFilter { +impl, F> BatchCursorFilter { fn new(cursor: B::Cursor, logic: F) -> Self { BatchCursorFilter { phantom: ::std::marker::PhantomData, @@ -191,12 +181,12 @@ impl, F> BatchCursorFilter, F> Cursor for BatchCursorFilter +impl, F> Cursor for BatchCursorFilter where T: Timestamp, F: FnMut(&K, &V)->bool+'static, { - type Storage = BatchFilter; + type Storage = BatchFilter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index fa24b91d7..706d6a1a8 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -41,7 +41,6 @@ where T::Key: 'static, T::Val: 'static, T::R: 'static, - T::Batch: BatchReader, T::Cursor: Cursor, F: Fn(&G::Timestamp)->Option+'static, { @@ -93,7 +92,7 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFreeze; + type Batch = BatchFreeze; type Cursor = CursorFreeze; fn map_batches(&self, mut f: F2) { @@ -137,47 +136,39 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFreeze { - phantom: ::std::marker::PhantomData<(K, V, R, T)>, +pub struct BatchFreeze { batch: B, func: Rc, } -impl Clone for BatchFreeze { +impl Clone for BatchFreeze { fn clone(&self) -> Self { BatchFreeze { - phantom: ::std::marker::PhantomData, batch: self.batch.clone(), func: self.func.clone(), } } } -impl BatchReader for BatchFreeze -where - B: BatchReader, - T: Clone, - F: Fn(&T)->Option, -{ - type Cursor = BatchCursorFreeze; +impl Option> BatchReader for BatchFreeze { + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFreeze; fn cursor(&self) -> Self::Cursor { BatchCursorFreeze::new(self.batch.cursor(), self.func.clone()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { self.batch.description() } + fn description(&self) -> &Description { self.batch.description() } } -impl BatchFreeze -where - B: BatchReader, - T: Clone, - F: Fn(&T)->Option -{ +impl Option> BatchFreeze { /// Makes a new batch wrapper pub fn make_from(batch: B, func: Rc) -> Self { BatchFreeze { - phantom: ::std::marker::PhantomData, batch: batch, func: func, } @@ -236,13 +227,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFreeze, F> { +pub struct BatchCursorFreeze, F> { phantom: ::std::marker::PhantomData<(K, V, R, T)>, cursor: B::Cursor, func: Rc, } -impl, F> BatchCursorFreeze { +impl, F> BatchCursorFreeze { fn new(cursor: B::Cursor, func: Rc) -> Self { BatchCursorFreeze { phantom: ::std::marker::PhantomData, @@ -252,12 +243,12 @@ impl, F> BatchCursorFreeze, F> Cursor for BatchCursorFreeze +impl, F> Cursor for BatchCursorFreeze where F: Fn(&T)->Option, { - type Storage = BatchFreeze; + type Storage = BatchFreeze; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 43ed2096b..d54c79beb 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -47,7 +47,7 @@ where type Time = Tr::Time; type R = Tr::R; - type Batch = BatchFrontier; + type Batch = BatchFrontier; type Cursor = CursorFrontier; fn map_batches(&self, mut f: F) { @@ -83,45 +83,37 @@ where /// Wrapper to provide batch to nested scope. -pub struct BatchFrontier { - phantom: ::std::marker::PhantomData<(K, V, T, R)>, +#[derive(Clone)] +pub struct BatchFrontier { batch: B, - frontier: Antichain, + frontier: Antichain, } -impl Clone for BatchFrontier { - fn clone(&self) -> Self { - BatchFrontier { - phantom: ::std::marker::PhantomData, - batch: self.batch.clone(), - frontier: self.frontier.to_owned(), - } - } -} - -impl BatchReader for BatchFrontier +impl BatchReader for BatchFrontier where - B: BatchReader, - T: Timestamp+Lattice, + B::Time: Timestamp + Lattice { - type Cursor = BatchCursorFrontier; + type Key = B::Key; + type Val = B::Val; + type Time = B::Time; + type R = B::R; + + type Cursor = BatchCursorFrontier; fn cursor(&self) -> Self::Cursor { BatchCursorFrontier::new(self.batch.cursor(), self.frontier.borrow()) } fn len(&self) -> usize { self.batch.len() } - fn description(&self) -> &Description { &self.batch.description() } + fn description(&self) -> &Description { &self.batch.description() } } -impl BatchFrontier +impl BatchFrontier where - B: BatchReader, - T: Timestamp+Lattice, + B::Time: Timestamp+Lattice { /// Makes a new batch wrapper - pub fn make_from(batch: B, frontier: AntichainRef) -> Self { + pub fn make_from(batch: B, frontier: AntichainRef) -> Self { BatchFrontier { - phantom: ::std::marker::PhantomData, batch, frontier: frontier.to_owned(), } @@ -182,13 +174,13 @@ where /// Wrapper to provide cursor to nested scope. -pub struct BatchCursorFrontier> { +pub struct BatchCursorFrontier> { phantom: ::std::marker::PhantomData<(K, V, R)>, cursor: B::Cursor, frontier: Antichain, } -impl> BatchCursorFrontier { +impl> BatchCursorFrontier { fn new(cursor: B::Cursor, frontier: AntichainRef) -> Self { BatchCursorFrontier { phantom: ::std::marker::PhantomData, @@ -198,11 +190,11 @@ impl> BatchCursorFrontier> Cursor for BatchCursorFrontier +impl> Cursor for BatchCursorFrontier where T: Timestamp+Lattice, { - type Storage = BatchFrontier; + type Storage = BatchFrontier; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) } diff --git a/tests/trace.rs b/tests/trace.rs index 1c22f4665..db4d5a90e 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -19,7 +19,7 @@ fn get_trace() -> Spine::Batch as Batch>::Batcher::new(); + let mut batcher = <::Batch as Batch>::Batcher::new(); batcher.push_batch(&mut vec![ ((1, 2), 0, 1),