diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 700801b7b..6485ba909 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -1,7 +1,7 @@ use timely::dataflow::Scope; use differential_dataflow::{ExchangeData, Collection, Hashable}; -use differential_dataflow::difference::{Monoid, Multiply}; +use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; use differential_dataflow::trace::cursor::IntoOwned; @@ -22,6 +22,7 @@ where G: Scope, Tr: TraceReader+Clone+'static, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>, + for<'a> Tr::Diff : Semigroup>, K: Hashable + Ord + Default + 'static, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 846107727..f0b50ad4e 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -86,7 +86,7 @@ where R: Mul, >::Output: Semigroup, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, - CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, + CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static, { @@ -138,7 +138,7 @@ where Tr: TraceReader+Clone+'static, for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, - CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, + CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, DOut: Clone+'static, ROut: Semigroup + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, @@ -210,14 +210,16 @@ where for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, work); - if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { + if !yielded && !input2.frontier.frontier().iter().any(|t| comparison( as IntoOwned>::borrow_as(t), initial)) { use differential_dataflow::trace::cursor::IntoOwned; cursor.seek_key(&storage, IntoOwned::borrow_as(key)); if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { - output_buffer.push((t.join(time), d.clone())) + let mut t = t.into_owned(); + t.join_assign(time); + output_buffer.push((t, d.into_owned())) } }); consolidate(&mut output_buffer); diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 5f45bd1a1..a805140fe 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -30,6 +30,7 @@ where G: Scope, Tr: TraceReader+Clone+'static, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>, + for<'a> Tr::Diff : Semigroup>, K: Hashable + Ord + 'static, Tr::Diff: Monoid+ExchangeData, F: FnMut(&D, &mut K)+Clone+'static, @@ -101,7 +102,7 @@ where while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { - if t.less_equal(time) { count.plus_equals(d); } + if t.into_owned().less_equal(time) { count.plus_equals(&d); } }); if !count.is_zero() { let (dout, rout) = output_func(prefix, diff, value, &count); diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index a7cee8b52..25df5d008 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -1,7 +1,7 @@ use timely::dataflow::Scope; use differential_dataflow::{ExchangeData, Collection, Hashable}; -use differential_dataflow::difference::{Monoid, Multiply}; +use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; use differential_dataflow::trace::cursor::IntoOwned; @@ -25,6 +25,7 @@ where for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, K: Hashable + Default + Ord + 'static, Tr::Diff: Monoid+Multiply+ExchangeData, + for<'a> Tr::Diff : Semigroup>, F: Fn(&P)->K+Clone+'static, P: ExchangeData, V: Clone + 'static, @@ -55,6 +56,7 @@ where G: Scope, Tr: TraceReader+Clone+'static, for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, + for<'a> Tr::Diff : Semigroup>, K: Hashable + Default + Ord + 'static, Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index fdadc7ba9..b6dfc404b 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use timely::dataflow::Scope; use differential_dataflow::{ExchangeData, Collection}; -use differential_dataflow::difference::{Monoid, Multiply}; +use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; use differential_dataflow::trace::cursor::IntoOwned; @@ -22,6 +22,7 @@ where G: Scope, Tr: TraceReader+Clone+'static, for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = (K, V)>, + for<'a> Tr::Diff : Semigroup>, K: Ord+Hash+Clone+Default + 'static, V: ExchangeData+Hash+Default, Tr::Diff: Monoid+Multiply+ExchangeData, diff --git a/src/difference.rs b/src/difference.rs index ec9ac7ec1..42c7bfe44 100644 --- a/src/difference.rs +++ b/src/difference.rs @@ -42,6 +42,13 @@ pub trait Semigroup : Clone + IsZero { fn plus_equals(&mut self, rhs: &Rhs); } +// Blanket implementation to support GATs of the form `&'a Diff`. +impl<'a, S, T: Semigroup> Semigroup<&'a S> for T { + fn plus_equals(&mut self, rhs: &&'a S) { + self.plus_equals(&**rhs); + } +} + /// A semigroup with an explicit zero element. pub trait Monoid : Semigroup { /// A zero element under the semigroup addition operator. diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 670769e65..5357dc931 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -46,7 +46,9 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; + type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = Tr::Batch; type Storage = Tr::Storage; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index f0454905d..ac08382c7 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -120,7 +120,7 @@ where -> Arranged, TraceEnterAt> where TInner: Refines+Lattice+Timestamp+Clone+'static, - F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp)->TInner+Clone+'static, + F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static, P: FnMut(&TInner)->Tr::Time+Clone+'static, { let logic1 = logic.clone(); @@ -217,7 +217,7 @@ where while let Some(val) = cursor.get_val(batch) { for datum in logic(key, val) { cursor.map_times(batch, |time, diff| { - session.give((datum.clone(), time.clone(), diff.clone())); + session.give((datum.clone(), time.into_owned(), diff.into_owned())); }); } cursor.step_val(batch); diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index c8012e770..3583f4473 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -254,7 +254,7 @@ where // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; - trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff); + trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned()); assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); diff --git a/src/operators/count.rs b/src/operators/count.rs index 85c44c176..e44a59a85 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -57,6 +57,7 @@ where G: Scope, T1: for<'a> TraceReader=&'a ()>+Clone+'static, for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, + for<'a> T1::Diff : Semigroup>, K: ExchangeData, T1::Time: TotalOrder, T1::Diff: ExchangeData, @@ -88,8 +89,8 @@ where trace_cursor.seek_key(&trace_storage, key); if trace_cursor.get_key(&trace_storage) == Some(key) { trace_cursor.map_times(&trace_storage, |_, diff| { - count.as_mut().map(|c| c.plus_equals(diff)); - if count.is_none() { count = Some(diff.clone()); } + count.as_mut().map(|c| c.plus_equals(&diff)); + if count.is_none() { count = Some(diff.into_owned()); } }); } @@ -97,14 +98,14 @@ where if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8))); + session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8))); } } - count.as_mut().map(|c| c.plus_equals(diff)); - if count.is_none() { count = Some(diff.clone()); } + count.as_mut().map(|c| c.plus_equals(&diff)); + if count.is_none() { count = Some(diff.into_owned()); } if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8))); + session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8))); } } }); diff --git a/src/operators/join.rs b/src/operators/join.rs index 7db301fca..d5723878e 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -677,8 +677,14 @@ where Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), Ordering::Equal => { - thinker.history1.edits.load(trace, trace_storage, |time| time.join(meet)); - thinker.history2.edits.load(batch, batch_storage, |time| time.clone()); + use crate::trace::cursor::IntoOwned; + + thinker.history1.edits.load(trace, trace_storage, |time| { + let mut time = time.into_owned(); + time.join_assign(meet); + time + }); + thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned()); // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 47b370a38..fdde1f242 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -20,6 +20,7 @@ pub mod threshold; use crate::lattice::Lattice; use crate::trace::Cursor; +use crate::trace::cursor::IntoOwned; /// An accumulation of (value, time, diff) updates. struct EditList<'a, C: Cursor> { @@ -39,11 +40,11 @@ impl<'a, C: Cursor> EditList<'a, C> { /// Loads the contents of a cursor. fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) where - L: Fn(&C::Time)->C::Time, + L: Fn(C::TimeGat<'_>)->C::Time, { self.clear(); while cursor.val_valid(storage) { - cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone())); + cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned())); self.seal(cursor.val(storage)); cursor.step_val(storage); } @@ -102,7 +103,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> { } fn load(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L) where - L: Fn(&C::Time)->C::Time, + L: Fn(C::TimeGat<'_>)->C::Time, { self.edits.load(cursor, storage, logic); } @@ -118,7 +119,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> { logic: L ) -> HistoryReplay<'storage, 'history, C> where - L: Fn(&C::Time)->C::Time, + L: Fn(C::TimeGat<'_>)->C::Time, { self.clear(); cursor.seek_key(storage, key); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 0bd99995d..e15e15eff 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -170,7 +170,7 @@ where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,K,(),KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + self.reduce_abelian::<_,K,(),KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } } @@ -221,7 +221,7 @@ where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn count_core + 'static>(&self) -> Collection { - self.reduce_abelian::<_,K,R,ValSpine<_,R,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + self.reduce_abelian::<_,K,R,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } } @@ -755,7 +755,7 @@ mod history_replay { // loaded times by performing the lattice `join` with this value. // Load the batch contents. - let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.clone()); + let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.into_owned()); // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet // can be used to advance other historical times, which may consolidate their representation. As @@ -791,16 +791,24 @@ mod history_replay { // Load the input and output histories. let mut input_replay = if let Some(meet) = meet.as_ref() { - self.input_history.replay_key(source_cursor, source_storage, key, |time| time.join(meet)) + self.input_history.replay_key(source_cursor, source_storage, key, |time| { + let mut time = time.into_owned(); + time.join_assign(meet); + time + }) } else { - self.input_history.replay_key(source_cursor, source_storage, key, |time| time.clone()) + self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned()) }; let mut output_replay = if let Some(meet) = meet.as_ref() { - self.output_history.replay_key(output_cursor, output_storage, key, |time| time.join(meet)) + self.output_history.replay_key(output_cursor, output_storage, key, |time| { + let mut time = time.into_owned(); + time.join_assign(meet); + time + }) } else { - self.output_history.replay_key(output_cursor, output_storage, key, |time| time.clone()) + self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned()) }; self.synth_times.clear(); diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index e05854694..8e8548ece 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -15,6 +15,7 @@ use crate::hashable::Hashable; use crate::collection::AsCollection; use crate::operators::arrange::{Arranged, ArrangeBySelf}; use crate::trace::{BatchReader, Cursor, TraceReader}; +use crate::trace::cursor::IntoOwned; /// Extension trait for the `distinct` differential dataflow method. pub trait ThresholdTotal where G::Timestamp: TotalOrder+Lattice+Ord { @@ -96,6 +97,7 @@ impl ThresholdTotal for Arranged where G: Scope, T1: for<'a> TraceReader=&'a K, Val<'a>=&'a ()>+Clone+'static, + for<'a> T1::Diff : Semigroup>, K: ExchangeData, T1::Time: TotalOrder, T1::Diff: ExchangeData, @@ -133,8 +135,8 @@ where trace_cursor.seek_key(&trace_storage, key); if trace_cursor.get_key(&trace_storage) == Some(key) { trace_cursor.map_times(&trace_storage, |_, diff| { - count.as_mut().map(|c| c.plus_equals(diff)); - if count.is_none() { count = Some(diff.clone()); } + count.as_mut().map(|c| c.plus_equals(&diff)); + if count.is_none() { count = Some(diff.into_owned()); } }); } @@ -146,23 +148,23 @@ where match &count { Some(old) => { let mut temp = old.clone(); - temp.plus_equals(diff); + temp.plus_equals(&diff); thresh(key, &temp, Some(old)) }, - None => { thresh(key, diff, None) }, + None => { thresh(key, &diff.into_owned(), None) }, }; // Either add or assign `diff` to `count`. if let Some(count) = &mut count { - count.plus_equals(diff); + count.plus_equals(&diff); } else { - count = Some(diff.clone()); + count = Some(diff.into_owned()); } if let Some(difference) = difference { if !difference.is_zero() { - session.give((key.clone(), time.clone(), difference)); + session.give((key.clone(), time.into_owned(), difference)); } } }); diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index f1b414590..f396b67a0 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -88,7 +88,9 @@ impl Cursor for CursorList { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = Vec; @@ -113,7 +115,7 @@ impl Cursor for CursorList { self.cursors[self.min_val[0]].val(&storage[self.min_val[0]]) } #[inline] - fn map_times(&mut self, storage: &Vec, mut logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Vec, mut logic: L) { for &index in self.min_val.iter() { self.cursors[index].map_times(&storage[index], |t,d| logic(t,d)); } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 27e24a13a..297f94247 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -27,7 +27,7 @@ pub trait IntoOwned<'a> { /// Conversion from an instance of this type to the owned type. fn into_owned(self) -> Self::Owned; /// Clones `self` onto an existing instance of the owned type. - fn clone_onto(&self, other: &mut Self::Owned); + fn clone_onto(self, other: &mut Self::Owned); /// Borrows an owned instance as oneself. fn borrow_as(owned: &'a Self::Owned) -> Self; } @@ -35,7 +35,7 @@ pub trait IntoOwned<'a> { impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T { type Owned = T::Owned; fn into_owned(self) -> Self::Owned { self.to_owned() } - fn clone_onto(&self, other: &mut Self::Owned) { ::clone_into(self, other) } + fn clone_onto(self, other: &mut Self::Owned) { ::clone_into(self, other) } fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() } } @@ -48,8 +48,12 @@ pub trait Cursor { type Val<'a>: Copy + Clone + Ord; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; - /// Associated update. - type Diff: Semigroup + ?Sized; + /// Borrowed form of timestamp. + type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; + /// Owned form of update difference. + type Diff: Semigroup + 'static; + /// Borrowed form of update difference. + type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>; /// Storage required by the cursor. type Storage; @@ -79,7 +83,7 @@ pub trait Cursor { /// Applies `logic` to each pair of time and difference. Intended for mutation of the /// closure's scope. - fn map_times(&mut self, storage: &Self::Storage, logic: L); + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L); /// Advances the cursor to the next key. fn step_key(&mut self, storage: &Self::Storage); @@ -109,7 +113,7 @@ pub trait Cursor { while self.val_valid(storage) { let mut kv_out = Vec::new(); self.map_times(storage, |ts, r| { - kv_out.push((ts.clone(), r.clone())); + kv_out.push((ts.into_owned(), r.into_owned())); }); out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out)); self.step_val(storage); diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index b069d4ce8..a58d6ba2c 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -50,6 +50,7 @@ impl PushInto> for HuffmanContainer { } impl BatchContainer for HuffmanContainer { + type Owned = Vec; type ReadItem<'a> = Wrapped<'a, B>; fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } @@ -212,7 +213,7 @@ mod wrapper { Err(bytes) => bytes.to_vec(), } } - fn clone_onto(&self, other: &mut Self::Owned) { + fn clone_onto(self, other: &mut Self::Owned) { other.clear(); match self.decode() { Ok(decode) => other.extend(decode.cloned()), diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 73609c641..83a89c088 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -93,10 +93,10 @@ pub trait Layout { type KeyContainer: BatchContainer + PushInto<::Key>; /// Container for update vals. type ValContainer: BatchContainer; - /// Container for update vals. - type UpdContainer: - PushInto<(::Time, ::Diff)> + - for<'a> BatchContainer = &'a (::Time, ::Diff)>; + /// Container for times. + type TimeContainer: BatchContainer::Time> + PushInto<::Time>; + /// Container for diffs. + type DiffContainer: BatchContainer::Diff> + PushInto<::Diff>; /// Container for offsets. type OffsetContainer: for<'a> BatchContainer = usize>; } @@ -113,7 +113,8 @@ where type Target = U; type KeyContainer = Vec; type ValContainer = Vec; - type UpdContainer = Vec<(U::Time, U::Diff)>; + type TimeContainer = Vec; + type DiffContainer = Vec; type OffsetContainer = OffsetList; } @@ -132,7 +133,8 @@ where type Target = U; type KeyContainer = TimelyStack; type ValContainer = TimelyStack; - type UpdContainer = TimelyStack<(U::Time, U::Diff)>; + type TimeContainer = TimelyStack; + type DiffContainer = TimelyStack; type OffsetContainer = OffsetList; } @@ -184,7 +186,8 @@ where type Target = Preferred; type KeyContainer = K::Container; type ValContainer = V::Container; - type UpdContainer = Vec<(T, D)>; + type TimeContainer = Vec; + type DiffContainer = Vec; type OffsetContainer = OffsetList; } @@ -294,8 +297,8 @@ impl<'a> IntoOwned<'a> for usize { self } - fn clone_onto(&self, other: &mut Self::Owned) { - *other = *self; + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; } fn borrow_as(owned: &'a Self::Owned) -> Self { @@ -304,6 +307,7 @@ impl<'a> IntoOwned<'a> for usize { } impl BatchContainer for OffsetList { + type Owned = usize; type ReadItem<'a> = usize; fn copy(&mut self, item: Self::ReadItem<'_>) { @@ -442,11 +446,15 @@ pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::PushInto; + use crate::trace::IntoOwned; /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { + /// An owned instance of `Self::ReadItem<'_>`. + type Owned; + /// The type that can be read back out of the container. - type ReadItem<'a>: Copy + Ord; + type ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::Owned>; /// Push an item into this container fn push(&mut self, item: D) where Self: PushInto { @@ -532,6 +540,7 @@ pub mod containers { // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). impl BatchContainer for Vec { + type Owned = T; type ReadItem<'a> = &'a T; fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } @@ -558,7 +567,8 @@ pub mod containers { // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now // be presented with the actual contained type, rather than a type that borrows into it. - impl BatchContainer for TimelyStack { + impl BatchContainer for TimelyStack { + type Owned = T; type ReadItem<'a> = &'a T; fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } @@ -625,6 +635,7 @@ pub mod containers { where B: Ord + Clone + Sized + 'static, { + type Owned = Vec; type ReadItem<'a> = &'a [B]; fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index c7884e493..786ea2cf7 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -98,19 +98,21 @@ mod val_batch { /// /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. pub vals_offs: L::OffsetContainer, - /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. - pub updates: L::UpdContainer, + /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`. + pub times: L::TimeContainer, + /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`. + pub diffs: L::DiffContainer, } impl OrdValStorage { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { - (self.keys_offs.index(index).into_owned(), self.keys_offs.index(index+1).into_owned()) + (self.keys_offs.index(index), self.keys_offs.index(index+1)) } /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. fn updates_for_value(&self, index: usize) -> (usize, usize) { - let mut lower = self.vals_offs.index(index).into_owned(); - let upper = self.vals_offs.index(index+1).into_owned(); + let mut lower = self.vals_offs.index(index); + let upper = self.vals_offs.index(index+1); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -143,7 +145,9 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; + type DiffGat<'a> = ::ReadItem<'a>; type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { @@ -192,6 +196,8 @@ mod val_batch { impl Merger> for OrdValMerger where OrdValBatch: Batch::Time>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -210,7 +216,8 @@ mod val_batch { keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), - updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), + times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times), + diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs), }; // Mark explicit types because type inference fails to resolve it. @@ -230,7 +237,7 @@ mod val_batch { } fn done(self) -> OrdValBatch { OrdValBatch { - updates: self.result.updates.len() + self.singletons, + updates: self.result.times.len() + self.singletons, storage: self.result, description: self.description, } @@ -238,14 +245,14 @@ mod val_batch { fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { // An (incomplete) indication of the amount of work we've done so far. - let starting_updates = self.result.updates.len(); + let starting_updates = self.result.times.len(); let mut effort = 0isize; // While both mergees are still active, perform single-key merges. while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.merge_key(&source1.storage, &source2.storage); // An (incomplete) accounting of the work we've done. - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } // Merging is complete, and only copying remains. @@ -253,12 +260,12 @@ mod val_batch { while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { self.copy_key(&source1.storage, self.key_cursor1); self.key_cursor1 += 1; - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.copy_key(&source2.storage, self.key_cursor2); self.key_cursor2 += 1; - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } *fuel -= effort; @@ -400,11 +407,12 @@ mod val_batch { let (lower, upper) = source.updates_for_value(index); for i in lower .. upper { // NB: Here is where we would need to look back if `lower == upper`. - let (time, diff) = &source.updates.index(i).into_owned(); + let time = source.times.index(i); + let diff = source.diffs.index(i); use crate::lattice::Lattice; - let mut new_time = time.clone(); + let mut new_time: ::Time = time.into_owned(); new_time.advance_by(self.description.since().borrow()); - self.update_stash.push((new_time, diff.clone())); + self.update_stash.push((new_time, diff.into_owned())); } } @@ -415,18 +423,25 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().eq(IntoOwned::borrow_as(ud))).unwrap_or(false) { - // Just clear out update_stash, as we won't drain it here. + let time_diff = self.result.times.last().zip(self.result.diffs.last()); + let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { + let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(t1); + let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(d1); + t1.eq(&t2) && d1.eq(&d2) + }); + if self.update_stash.len() == 1 && last_eq.unwrap_or(false) { + // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; } else { // Conventional; move `update_stash` into `updates`. - for item in self.update_stash.drain(..) { - self.result.updates.push(item); + for (time, diff) in self.update_stash.drain(..) { + self.result.times.push(time); + self.result.diffs.push(diff); } } - Some(self.result.updates.len()) + Some(self.result.times.len()) } else { None } @@ -448,16 +463,19 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; + type DiffGat<'a> = ::ReadItem<'a>; type Storage = OrdValBatch; fn key<'a>(&self, storage: &'a OrdValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a OrdValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } - fn map_times(&mut self, storage: &OrdValBatch, mut logic: L2) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); for index in lower .. upper { - let (time, diff) = &storage.storage.updates.index(index); + let time = storage.storage.times.index(index); + let diff = storage.storage.diffs.index(index); logic(time, diff); } } @@ -524,16 +542,20 @@ mod val_batch { /// to recover the singleton to push it into `updates` to join the second update. fn push_update(&mut self, time: ::Time, diff: ::Diff) { // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) { + if self.result.times.last().map(|t| t == <::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) && + self.result.diffs.last().map(|d| d == <::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true) + { assert!(self.singleton.is_none()); self.singleton = Some((time, diff)); } else { // If we have pushed a single element, we need to copy it out to meet this one. - if let Some(time_diff) = self.singleton.take() { - self.result.updates.push(time_diff); + if let Some((time, diff)) = self.singleton.take() { + self.result.times.push(time); + self.result.diffs.push(diff); } - self.result.updates.push((time, diff)); + self.result.times.push(time); + self.result.diffs.push(diff); } } } @@ -544,6 +566,8 @@ mod val_batch { CI: for<'a> BuilderInput::Time, Diff=::Diff>, for<'a> L::KeyContainer: PushInto>, for<'a> L::ValContainer: PushInto>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, { type Input = CI; @@ -558,7 +582,8 @@ mod val_batch { keys_offs: L::OffsetContainer::with_capacity(keys + 1), vals: L::ValContainer::with_capacity(vals), vals_offs: L::OffsetContainer::with_capacity(vals + 1), - updates: L::UpdContainer::with_capacity(upds), + times: L::TimeContainer::with_capacity(upds), + diffs: L::DiffContainer::with_capacity(upds), }, singleton: None, singletons: 0, @@ -577,14 +602,14 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.copy(self.result.updates.len()); + self.result.vals_offs.copy(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.copy(self.result.updates.len()); + self.result.vals_offs.copy(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.copy(self.result.vals.len()); self.push_update(time, diff); @@ -597,12 +622,12 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { // Record the final offsets - self.result.vals_offs.copy(self.result.updates.len()); + self.result.vals_offs.copy(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.copy(self.result.vals.len()); OrdValBatch { - updates: self.result.updates.len() + self.singletons, + updates: self.result.times.len() + self.singletons, storage: self.result, description: Description::new(lower, upper, since), } @@ -638,15 +663,17 @@ mod key_batch { /// /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. pub keys_offs: L::OffsetContainer, - /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. - pub updates: L::UpdContainer, + /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`. + pub times: L::TimeContainer, + /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`. + pub diffs: L::DiffContainer, } impl OrdKeyStorage { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn updates_for_key(&self, index: usize) -> (usize, usize) { - let mut lower = self.keys_offs.index(index).into_owned(); - let upper = self.keys_offs.index(index+1).into_owned(); + let mut lower = self.keys_offs.index(index); + let upper = self.keys_offs.index(index+1); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -680,7 +707,9 @@ mod key_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = &'a (); type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; + type DiffGat<'a> = ::ReadItem<'a>; type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { @@ -728,7 +757,9 @@ mod key_batch { impl Merger> for OrdKeyMerger where - OrdKeyBatch: Batch::Time> + OrdKeyBatch: Batch::Time>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, { fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -745,7 +776,8 @@ mod key_batch { let mut storage = OrdKeyStorage { keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), - updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), + times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times), + diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs), }; let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; @@ -762,7 +794,7 @@ mod key_batch { } fn done(self) -> OrdKeyBatch { OrdKeyBatch { - updates: self.result.updates.len() + self.singletons, + updates: self.result.times.len() + self.singletons, storage: self.result, description: self.description, } @@ -770,14 +802,14 @@ mod key_batch { fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { // An (incomplete) indication of the amount of work we've done so far. - let starting_updates = self.result.updates.len(); + let starting_updates = self.result.times.len(); let mut effort = 0isize; // While both mergees are still active, perform single-key merges. while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.merge_key(&source1.storage, &source2.storage); // An (incomplete) accounting of the work we've done. - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } // Merging is complete, and only copying remains. @@ -785,12 +817,12 @@ mod key_batch { while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { self.copy_key(&source1.storage, self.key_cursor1); self.key_cursor1 += 1; - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.copy_key(&source2.storage, self.key_cursor2); self.key_cursor2 += 1; - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } *fuel -= effort; @@ -848,11 +880,12 @@ mod key_batch { let (lower, upper) = source.updates_for_key(index); for i in lower .. upper { // NB: Here is where we would need to look back if `lower == upper`. - let (time, diff) = &source.updates.index(i); + let time = source.times.index(i); + let diff = source.diffs.index(i); use crate::lattice::Lattice; - let mut new_time = time.clone(); + let mut new_time = time.into_owned(); new_time.advance_by(self.description.since().borrow()); - self.update_stash.push((new_time, diff.clone())); + self.update_stash.push((new_time, diff.into_owned())); } } @@ -863,18 +896,25 @@ mod key_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() { + let time_diff = self.result.times.last().zip(self.result.diffs.last()); + let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { + let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(t1); + let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(d1); + t1.eq(&t2) && d1.eq(&d2) + }); + if self.update_stash.len() == 1 && last_eq.unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; } else { // Conventional; move `update_stash` into `updates`. - for item in self.update_stash.drain(..) { - self.result.updates.push(item); + for (time, diff) in self.update_stash.drain(..) { + self.result.times.push(time); + self.result.diffs.push(diff); } } - Some(self.result.updates.len()) + Some(self.result.times.len()) } else { None } @@ -892,20 +932,22 @@ mod key_batch { } impl Cursor for OrdKeyCursor { - type Key<'a> = ::ReadItem<'a>; type Val<'a> = &'a (); type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; + type DiffGat<'a> = ::ReadItem<'a>; type Storage = OrdKeyBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_key(self.key_cursor); for index in lower .. upper { - let (time, diff) = &storage.storage.updates.index(index); + let time = storage.storage.times.index(index); + let diff = storage.storage.diffs.index(index); logic(time, diff); } } @@ -967,16 +1009,20 @@ mod key_batch { /// to recover the singleton to push it into `updates` to join the second update. fn push_update(&mut self, time: ::Time, diff: ::Diff) { // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) { + let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(&time); + let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(&diff); + if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) { assert!(self.singleton.is_none()); self.singleton = Some((time, diff)); } else { // If we have pushed a single element, we need to copy it out to meet this one. - if let Some(time_diff) = self.singleton.take() { - self.result.updates.push(time_diff); + if let Some((time, diff)) = self.singleton.take() { + self.result.times.push(time); + self.result.diffs.push(diff); } - self.result.updates.push((time, diff)); + self.result.times.push(time); + self.result.diffs.push(diff); } } } @@ -986,6 +1032,8 @@ mod key_batch { L: Layout, CI: for<'a> BuilderInput::Time, Diff=::Diff>, for<'a> L::KeyContainer: PushInto>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, + for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, { type Input = CI; @@ -998,7 +1046,8 @@ mod key_batch { result: OrdKeyStorage { keys: L::KeyContainer::with_capacity(keys), keys_offs: L::OffsetContainer::with_capacity(keys + 1), - updates: L::UpdContainer::with_capacity(upds), + times: L::TimeContainer::with_capacity(upds), + diffs: L::DiffContainer::with_capacity(upds), }, singleton: None, singletons: 0, @@ -1015,7 +1064,7 @@ mod key_batch { self.push_update(time, diff); } else { // New key; complete representation of prior key. - self.result.keys_offs.copy(self.result.updates.len()); + self.result.keys_offs.copy(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); @@ -1027,11 +1076,11 @@ mod key_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { // Record the final offsets - self.result.keys_offs.copy(self.result.updates.len()); + self.result.keys_offs.copy(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } OrdKeyBatch { - updates: self.result.updates.len() + self.singletons, + updates: self.result.times.len() + self.singletons, storage: self.result, description: Description::new(lower, upper, since), } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index ef857cac7..a2fb6241a 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -143,8 +143,10 @@ mod val_batch { /// /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. pub vals_offs: L::OffsetContainer, - /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. - pub updates: L::UpdContainer, + /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`. + pub times: L::TimeContainer, + /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`. + pub diffs: L::DiffContainer, } impl RhhValStorage @@ -154,16 +156,16 @@ mod val_batch { { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { - let lower = self.keys_offs.index(index).into_owned(); - let upper = self.keys_offs.index(index+1).into_owned(); + let lower = self.keys_offs.index(index); + let upper = self.keys_offs.index(index+1); // Looking up values for an invalid key indicates something is wrong. assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index); (lower, upper) } /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. fn updates_for_value(&self, index: usize) -> (usize, usize) { - let mut lower = self.vals_offs.index(index).into_owned(); - let upper = self.vals_offs.index(index+1).into_owned(); + let mut lower = self.vals_offs.index(index); + let upper = self.vals_offs.index(index+1); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -188,7 +190,7 @@ mod val_batch { // push additional blank entries in. while self.keys.len() < desired { // We insert a default (dummy) key and repeat the offset to indicate this. - let current_offset = self.keys_offs.index(self.keys.len()).into_owned(); + let current_offset = self.keys_offs.index(self.keys.len()); self.keys.push(Default::default()); self.keys_offs.copy(current_offset); } @@ -272,7 +274,9 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; + type DiffGat<'a> = ::ReadItem<'a>; type Cursor = RhhValCursor; fn cursor(&self) -> Self::Cursor { @@ -355,7 +359,8 @@ mod val_batch { keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), - updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), + times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times), + diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs), key_count: 0, key_capacity: rhh_cap, divisor: RhhValStorage::::divisor_for_capacity(rhh_cap), @@ -378,7 +383,7 @@ mod val_batch { } fn done(self) -> RhhValBatch { RhhValBatch { - updates: self.result.updates.len() + self.singletons, + updates: self.result.times.len() + self.singletons, storage: self.result, description: self.description, } @@ -386,7 +391,7 @@ mod val_batch { fn work(&mut self, source1: &RhhValBatch, source2: &RhhValBatch, fuel: &mut isize) { // An (incomplete) indication of the amount of work we've done so far. - let starting_updates = self.result.updates.len(); + let starting_updates = self.result.times.len(); let mut effort = 0isize; source1.storage.advance_to_live_key(&mut self.key_cursor1); @@ -398,7 +403,7 @@ mod val_batch { source1.storage.advance_to_live_key(&mut self.key_cursor1); source2.storage.advance_to_live_key(&mut self.key_cursor2); // An (incomplete) accounting of the work we've done. - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } // Merging is complete, and only copying remains. @@ -407,13 +412,13 @@ mod val_batch { self.copy_key(&source1.storage, self.key_cursor1); self.key_cursor1 += 1; source1.storage.advance_to_live_key(&mut self.key_cursor1); - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.copy_key(&source2.storage, self.key_cursor2); self.key_cursor2 += 1; source2.storage.advance_to_live_key(&mut self.key_cursor2); - effort = (self.result.updates.len() - starting_updates) as isize; + effort = (self.result.times.len() - starting_updates) as isize; } *fuel -= effort; @@ -558,11 +563,12 @@ mod val_batch { let (lower, upper) = source.updates_for_value(index); for i in lower .. upper { // NB: Here is where we would need to look back if `lower == upper`. - let (time, diff) = &source.updates.index(i); - let mut new_time = time.clone(); + let time = source.times.index(i); + let diff = source.diffs.index(i); + let mut new_time = time.into_owned(); use crate::lattice::Lattice; new_time.advance_by(self.description.since().borrow()); - self.update_stash.push((new_time, diff.clone())); + self.update_stash.push((new_time, diff.into_owned())); } } @@ -573,18 +579,25 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.eq(IntoOwned::borrow_as(self.update_stash.last().unwrap()))).unwrap_or(false) { + let time_diff = self.result.times.last().zip(self.result.diffs.last()); + let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { + let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(t1); + let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(d1); + t1.eq(&t2) && d1.eq(&d2) + }); + if self.update_stash.len() == 1 && last_eq.unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; } else { // Conventional; move `update_stash` into `updates`. - for item in self.update_stash.drain(..) { - self.result.updates.push(item); + for (time, diff) in self.update_stash.drain(..) { + self.result.times.push(time); + self.result.diffs.push(diff); } } - Some(self.result.updates.len()) + Some(self.result.times.len()) } else { None } @@ -619,7 +632,9 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; + type DiffGat<'a> = ::ReadItem<'a>; type Storage = RhhValBatch; @@ -627,10 +642,11 @@ mod val_batch { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a RhhValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } - fn map_times(&mut self, storage: &RhhValBatch, mut logic: L2) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); for index in lower .. upper { - let (time, diff) = &storage.storage.updates.index(index); + let time = storage.storage.times.index(index); + let diff = storage.storage.diffs.index(index); logic(time, diff); } } @@ -722,16 +738,20 @@ mod val_batch { /// to recover the singleton to push it into `updates` to join the second update. fn push_update(&mut self, time: ::Time, diff: ::Diff) { // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) { + let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(&time); + let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(&diff); + if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) { assert!(self.singleton.is_none()); self.singleton = Some((time, diff)); } else { // If we have pushed a single element, we need to copy it out to meet this one. - if let Some(time_diff) = self.singleton.take() { - self.result.updates.push(time_diff); + if let Some((time, diff)) = self.singleton.take() { + self.result.times.push(time); + self.result.diffs.push(diff); } - self.result.updates.push((time, diff)); + self.result.times.push(time); + self.result.diffs.push(diff); } } } @@ -764,7 +784,8 @@ mod val_batch { keys_offs: L::OffsetContainer::with_capacity(keys + 1), vals: L::ValContainer::with_capacity(vals), vals_offs: L::OffsetContainer::with_capacity(vals + 1), - updates: L::UpdContainer::with_capacity(upds), + times: L::TimeContainer::with_capacity(upds), + diffs: L::DiffContainer::with_capacity(upds), key_count: 0, key_capacity: rhh_capacity, divisor, @@ -786,14 +807,14 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.copy(self.result.updates.len()); + self.result.vals_offs.copy(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.copy(self.result.updates.len()); + self.result.vals_offs.copy(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.copy(self.result.vals.len()); self.push_update(time, diff); @@ -807,12 +828,12 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> RhhValBatch { // Record the final offsets - self.result.vals_offs.copy(self.result.updates.len()); + self.result.vals_offs.copy(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.copy(self.result.vals.len()); RhhValBatch { - updates: self.result.updates.len() + self.singletons, + updates: self.result.times.len() + self.singletons, storage: self.result, description: Description::new(lower, upper, since), } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 215fe791d..eb6b44585 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -112,7 +112,9 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; type Batch = B; type Storage = Vec; diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 1f513333f..46d3dd428 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -57,17 +57,21 @@ pub trait TraceReader { type Val<'a>: Copy + Clone; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; - /// Associated update. + /// Borrowed form of timestamp. + type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; + /// Owned form of update difference. type Diff: Semigroup + 'static; + /// Borrowed form of update difference. + type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>; /// The type of an immutable collection of updates. - type Batch: for<'a> BatchReader = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>+Clone+'static; + type Batch: for<'a> BatchReader = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>+Clone+'static; /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`. type Storage; /// The type used to enumerate the collections contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; + type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>; /// Provides a cursor over updates contained in the trace. fn cursor(&mut self) -> (Self::Cursor, Self::Storage) { @@ -261,11 +265,15 @@ where type Val<'a>: Copy + Clone; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; - /// Associated update. - type Diff: Semigroup; + /// Borrowed form of timestamp. + type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; + /// Owned form of update difference. + type Diff: Semigroup + 'static; + /// Borrowed form of update difference. + type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>; /// The type used to enumerate the batch's contents. - type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, Diff = Self::Diff>; + type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>; /// Acquires a cursor to the batch's contents. fn cursor(&self) -> Self::Cursor; /// The number of updates in the batch. @@ -371,7 +379,9 @@ pub mod rc_blanket_impls { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; /// The type used to enumerate the batch's contents. type Cursor = RcBatchCursor; @@ -404,7 +414,9 @@ pub mod rc_blanket_impls { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = Rc; @@ -415,7 +427,7 @@ pub mod rc_blanket_impls { #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { self.cursor.map_times(storage, logic) } @@ -472,7 +484,9 @@ pub mod abomonated_blanket_impls { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; /// The type used to enumerate the batch's contents. type Cursor = AbomonatedBatchCursor; @@ -505,7 +519,9 @@ pub mod abomonated_blanket_impls { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = Abomonated>; @@ -516,7 +532,7 @@ pub mod abomonated_blanket_impls { #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { self.cursor.map_times(storage, logic) } diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index 64a21af99..4f8280c70 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -34,7 +34,9 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = Tr::Diff; + type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = BatchEnter; type Storage = Tr::Storage; @@ -116,7 +118,9 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; type Cursor = BatchCursorEnter; @@ -168,7 +172,9 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = C::Storage; @@ -179,9 +185,10 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + use crate::trace::cursor::IntoOwned; self.cursor.map_times(storage, |time, diff| { - logic(&TInner::to_inner(time.clone()), diff) + logic(&TInner::to_inner(time.into_owned()), diff) }) } @@ -219,7 +226,9 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = BatchEnter; @@ -230,9 +239,10 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + use crate::trace::cursor::IntoOwned; self.cursor.map_times(&storage.batch, |time, diff| { - logic(&TInner::to_inner(time.clone()), diff) + logic(&TInner::to_inner(time.into_owned()), diff) }) } diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index f5a1b3990..bde170090 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -46,13 +46,15 @@ where Tr::Batch: Clone, TInner: Refines+Lattice, F: 'static, - F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &Tr::Time)->TInner+Clone, + F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone, G: FnMut(&TInner)->Tr::Time+Clone+'static, { type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = Tr::Diff; + type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = BatchEnter; type Storage = Tr::Storage; @@ -134,12 +136,14 @@ impl BatchReader for BatchEnter where B: BatchReader, TInner: Refines+Lattice, - F: FnMut(B::Key<'_>, ::Val<'_>, &B::Time)->TInner+Clone, + F: FnMut(B::Key<'_>, ::Val<'_>, B::TimeGat<'_>)->TInner+Clone, { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; type Cursor = BatchCursorEnter; @@ -190,12 +194,14 @@ impl Cursor for CursorEnter where C: Cursor, TInner: Refines+Lattice, - F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner, + F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner, { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = C::Storage; @@ -206,7 +212,7 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { let key = self.key(storage); let val = self.val(storage); let logic2 = &mut self.logic; @@ -247,12 +253,14 @@ impl BatchCursorEnter { impl Cursor for BatchCursorEnter where TInner: Refines+Lattice, - F: FnMut(C::Key<'_>, C::Val<'_>, &C::Time)->TInner, + F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner, { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = BatchEnter; @@ -263,7 +271,7 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { let key = self.key(storage); let val = self.val(storage); let logic2 = &mut self.logic; diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index dbac502e9..f43ff0dc1 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -33,7 +33,9 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; + type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = BatchFilter; type Storage = Tr::Storage; @@ -85,7 +87,9 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; type Cursor = BatchCursorFilter; @@ -132,7 +136,9 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = C::Storage; @@ -143,7 +149,7 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { @@ -185,7 +191,9 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = BatchFilter; @@ -196,7 +204,7 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index 07006d5f9..201d8dc92 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -26,6 +26,7 @@ use timely::progress::frontier::AntichainRef; use crate::operators::arrange::Arranged; use crate::trace::{TraceReader, BatchReader, Description}; use crate::trace::cursor::Cursor; +use crate::trace::cursor::IntoOwned; /// Freezes updates to an arrangement using a supplied function. /// @@ -36,7 +37,7 @@ pub fn freeze(arranged: &Arranged, func: F) -> Arranged, T: TraceReader+Clone, - F: Fn(&G::Timestamp)->Option+'static, + F: Fn(T::TimeGat<'_>)->Option+'static, { let func1 = Rc::new(func); let func2 = func1.clone(); @@ -50,7 +51,7 @@ where pub struct TraceFreeze where Tr: TraceReader, - F: Fn(&Tr::Time)->Option, + F: Fn(Tr::TimeGat<'_>)->Option, { trace: Tr, func: Rc, @@ -59,7 +60,7 @@ where impl Clone for TraceFreeze where Tr: TraceReader+Clone, - F: Fn(&Tr::Time)->Option, + F: Fn(Tr::TimeGat<'_>)->Option, { fn clone(&self) -> Self { TraceFreeze { @@ -73,12 +74,14 @@ impl TraceReader for TraceFreeze where Tr: TraceReader, Tr::Batch: Clone, - F: Fn(&Tr::Time)->Option+'static, + F: Fn(Tr::TimeGat<'_>)->Option+'static, { type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; + type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = BatchFreeze; type Storage = Tr::Storage; @@ -108,7 +111,7 @@ impl TraceFreeze where Tr: TraceReader, Tr::Batch: Clone, - F: Fn(&Tr::Time)->Option, + F: Fn(Tr::TimeGat<'_>)->Option, { /// Makes a new trace wrapper pub fn make_from(trace: Tr, func: Rc) -> Self { @@ -135,12 +138,14 @@ impl Clone for BatchFreeze { impl BatchReader for BatchFreeze where B: BatchReader, - F: Fn(&B::Time)->Option, + F: Fn(B::TimeGat<'_>)->Option, { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; type Cursor = BatchCursorFreeze; @@ -154,7 +159,7 @@ where impl BatchFreeze where B: BatchReader, - F: Fn(&B::Time)->Option + F: Fn(B::TimeGat<'_>)->Option { /// Makes a new batch wrapper pub fn make_from(batch: B, func: Rc) -> Self { @@ -177,12 +182,14 @@ impl CursorFreeze { impl Cursor for CursorFreeze where C: Cursor, - F: Fn(&C::Time)->Option, + F: Fn(C::TimeGat<'_>)->Option, { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = C::Storage; @@ -192,11 +199,11 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } - #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(storage, |time, diff| { if let Some(time) = func(time) { - logic(&time, diff); + logic( as IntoOwned>::borrow_as(&time), diff); } }) } @@ -227,12 +234,14 @@ impl BatchCursorFreeze { // impl, B: BatchReader, F> Cursor for BatchCursorFreeze impl Cursor for BatchCursorFreeze where - F: Fn(&C::Time)->Option, + F: Fn(C::TimeGat<'_>)->Option, { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = BatchFreeze; @@ -242,11 +251,11 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } - #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(&storage.batch, |time, diff| { if let Some(time) = func(time) { - logic(&time, diff); + logic( as IntoOwned>::borrow_as(&time), diff); } }) } diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 727e8ca1c..8379b8bf6 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -35,7 +35,9 @@ impl TraceReader for TraceFrontier { type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; + type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = BatchFrontier; type Storage = Tr::Storage; @@ -84,7 +86,9 @@ impl BatchReader for BatchFrontier { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; + type DiffGat<'a> = B::DiffGat<'a>; type Cursor = BatchCursorFrontier; @@ -127,7 +131,9 @@ impl Cursor for CursorFrontier { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = C::Storage; @@ -138,15 +144,16 @@ impl Cursor for CursorFrontier { #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let since = self.since.borrow(); let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(storage, |time, diff| { - temp.clone_from(time); + use crate::trace::cursor::IntoOwned; + time.clone_onto(&mut temp); temp.advance_by(since); if !until.less_equal(&temp) { - logic(&temp, diff); + logic( as IntoOwned>::borrow_as(&temp), diff); } }) } @@ -187,7 +194,9 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; + type DiffGat<'a> = C::DiffGat<'a>; type Storage = BatchFrontier; @@ -198,15 +207,16 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } #[inline] - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let since = self.since.borrow(); let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(&storage.batch, |time, diff| { - temp.clone_from(time); + use crate::trace::cursor::IntoOwned; + time.clone_onto(&mut temp); temp.advance_by(since); if !until.less_equal(&temp) { - logic(&temp, diff); + logic( as IntoOwned>::borrow_as(&temp), diff); } }) } diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index d94dcf264..4a877031a 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -82,7 +82,9 @@ impl TraceReader for TraceRc { type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; + type DiffGat<'a> = Tr::DiffGat<'a>; type Batch = Tr::Batch; type Storage = Tr::Storage;