diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 5c995e048..989c274af 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -37,22 +37,98 @@ use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; +/// A type that names constituent update types. +pub trait Update { + /// Key by which data are grouped. + type Key: Ord+Clone; + /// Values associated with the key. + type Val: Ord+Clone; + /// Time at which updates occur. + type Time: Ord+Lattice+timely::progress::Timestamp+Clone; + /// Way in which updates occur. + type Diff: Semigroup+Clone; +} + +impl Update for ((K, V), T, R) +where + K: Ord+Clone, + V: Ord+Clone, + T: Ord+Lattice+timely::progress::Timestamp+Clone, + R: Semigroup+Clone, +{ + type Key = K; + type Val = V; + type Time = T; + type Diff = R; +} + +/// A type with opinions on how updates should be laid out. +pub trait Layout { + /// The represented update. + type Target: Update; + /// Offsets to use from keys into vals. + type KeyOffset: OrdOffset; + /// Offsets to use from vals into updates. + type ValOffset: OrdOffset; + /// Container for update keys. + type KeyContainer: + BatchContainer::Key> + +Deref::Key]> + +RetainFrom<::Key>; + /// Container for update vals. + type ValContainer: + BatchContainer::Val> + +Deref::Val]> + +RetainFrom<::Val>; +} + +/// A layout that uses vectors +pub struct Vector { + phantom: std::marker::PhantomData<(U, O)>, +} + +impl Layout for Vector { + type Target = U; + type KeyOffset = O; + type ValOffset = O; + type KeyContainer = Vec; + type ValContainer = Vec; +} + +/// A layout based on timely stacks +pub struct TStack { + phantom: std::marker::PhantomData<(U, O)>, +} + +impl Layout for TStack +where + U::Key: Columnation, + U::Val: Columnation, +{ + type Target = U; + type KeyOffset = O; + type ValOffset = O; + type KeyContainer = TimelyStack; + type ValContainer = TimelyStack; +} + + /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec>>>; +pub type OrdValSpineAbom = Spine>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>; +pub type OrdKeySpine = Spine>>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec>>>; +pub type OrdKeySpineAbom = Spine>, Vec>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack>>>; +pub type ColValSpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine>>>; /// A container that can retain/discard from some offset onward. @@ -86,78 +162,52 @@ impl RetainFrom for TimelyStack { } /// An immutable collection of update tuples, from a contiguous interval of logical times. -#[derive(Debug, Abomonation)] -pub struct OrdValBatch, CV=Vec> -where - K: Ord+Clone, - V: Ord+Clone, - T: Clone+Lattice, - R: Clone, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ +#[derive(Abomonation)] +pub struct OrdValBatch { /// Where all the dataz is. - pub layer: OrderedLayer, O, CV>, O, CK>, + pub layer: KVTDLayer, /// Description of the update times this layer represents. - pub desc: Description, + pub desc: Description<::Time>, } -impl BatchReader for OrdValBatch -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - type Key = K; - type Val = V; - type Time = T; - type R = R; - - type Cursor = OrdValCursor; +// Type aliases to make certain types readable. +type TDLayer = OrderedLeaf<<::Target as Update>::Time, <::Target as Update>::Diff>; +type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValOffset, ::ValContainer>; +type KTDLayer = OrderedLayer<<::Target as Update>::Key, TDLayer, ::KeyOffset, ::KeyContainer>; +type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyOffset, ::KeyContainer>; +type TDBuilder = OrderedLeafBuilder<<::Target as Update>::Time, <::Target as Update>::Diff>; +type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValOffset, ::ValContainer>; +type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; +type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; + +impl BatchReader for OrdValBatch { + type Key = ::Key; + type Val = ::Val; + type Time = ::Time; + type R = ::Diff; + + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } - fn len(&self) -> usize { , O, CV>, O, CK> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } + fn len(&self) -> usize { as Trie>::tuples(&self.layer) } + fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdValBatch -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ +impl Batch for OrdValBatch { type Batcher = MergeBatcher; - type Builder = OrdValBuilder; - type Merger = OrdValMerger; + type Builder = OrdValBuilder; + type Merger = OrdValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } } -impl OrdValBatch -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - fn advance_builder_from(layer: &mut OrderedBuilder, O, CV>, O, CK>, frontier: AntichainRef, key_pos: usize) { +impl OrdValBatch { + fn advance_builder_from(layer: &mut KVTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; - let val_start: usize = layer.offs[key_pos].try_into().unwrap(); - let time_start: usize = layer.vals.offs[val_start].try_into().unwrap(); + let val_start: usize = layer.offs[key_pos].try_into().ok().unwrap(); + let time_start: usize = layer.vals.offs[val_start].try_into().ok().unwrap(); // We have unique ownership of the batch, and can advance times in place. // We must still sort, collapse, and remove empty updates. @@ -180,10 +230,10 @@ where // we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s // initial value. - let lower: usize = layer.vals.offs[i].try_into().unwrap(); - let upper: usize = layer.vals.offs[i+1].try_into().unwrap(); + let lower: usize = layer.vals.offs[i].try_into().ok().unwrap(); + let upper: usize = layer.vals.offs[i+1].try_into().ok().unwrap(); - layer.vals.offs[i] = O::try_from(write_position).unwrap(); + layer.vals.offs[i] = L::ValOffset::try_from(write_position).ok().unwrap(); let updates = &mut layer.vals.vals.vals[..]; @@ -196,7 +246,7 @@ where } } layer.vals.vals.vals.truncate(write_position); - layer.vals.offs[layer.vals.keys.len()] = O::try_from(write_position).unwrap(); + layer.vals.offs[layer.vals.keys.len()] = L::ValOffset::try_from(write_position).ok().unwrap(); // 3. Remove values with empty histories. In addition, we need to update offsets // in `layer.offs` to correctly reference the potentially moved values. @@ -206,12 +256,12 @@ where let keys_off = &mut layer.offs; layer.vals.keys.retain_from(val_start, |index, _item| { // As we pass each key offset, record its new position. - if index == keys_off[keys_pos].try_into().unwrap() { - keys_off[keys_pos] = O::try_from(write_position).unwrap(); + if index == keys_off[keys_pos].try_into().ok().unwrap() { + keys_off[keys_pos] = L::KeyOffset::try_from(write_position).ok().unwrap(); keys_pos += 1; } - let lower = vals_off[index].try_into().unwrap(); - let upper = vals_off[index+1].try_into().unwrap(); + let lower = vals_off[index].try_into().ok().unwrap(); + let upper = vals_off[index+1].try_into().ok().unwrap(); if lower < upper { vals_off[write_position+1] = vals_off[index+1]; write_position += 1; @@ -221,14 +271,14 @@ where }); debug_assert_eq!(write_position, layer.vals.keys.len()); layer.vals.offs.truncate(write_position + 1); - layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap(); + layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap(); // 4. Remove empty keys. let offs = &mut layer.offs; let mut write_position = key_start; layer.keys.retain_from(key_start, |index, _item| { - let lower = offs[index].try_into().unwrap(); - let upper = offs[index+1].try_into().unwrap(); + let lower = offs[index].try_into().ok().unwrap(); + let upper = offs[index+1].try_into().ok().unwrap(); if lower < upper { offs[write_position+1] = offs[index+1]; write_position += 1; @@ -242,16 +292,7 @@ where } /// State for an in-progress merge. -pub struct OrdValMerger, CV=Vec> -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ +pub struct OrdValMerger { // first batch, and position therein. lower1: usize, upper1: usize, @@ -259,22 +300,13 @@ where lower2: usize, upper2: usize, // result that we are currently assembling. - result: , O, CV>, O, CK> as Trie>::MergeBuilder, - description: Description, + result: as Trie>::MergeBuilder, + description: Description<::Time>, should_compact: bool, } -impl Merger> for OrdValMerger -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { +impl Merger> for OrdValMerger { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -290,12 +322,12 @@ where upper1: batch1.layer.keys(), lower2: 0, upper2: batch2.layer.keys(), - result: <, O, CV>, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), + result: < as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -305,7 +337,7 @@ where desc: self.description, } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.vals.len(); let mut effort = 0isize; @@ -344,7 +376,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -356,40 +388,22 @@ where } /// A cursor for navigating a single layer. -#[derive(Debug)] -pub struct OrdValCursor, CV=Vec> -where - V: Ord+Clone, - T: Lattice+Ord+Clone, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - phantom: std::marker::PhantomData<(K, CK, CV)>, - cursor: OrderedCursor, O, CV>>, +pub struct OrdValCursor { + phantom: std::marker::PhantomData, + cursor: OrderedCursor>, } -impl Cursor for OrdValCursor -where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+Ord+Clone, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - type Key = K; - type Val = V; - type Time = T; - type R = R; +impl Cursor for OrdValCursor { + type Key = ::Key; + type Val = ::Val; + type Time = ::Time; + type R = ::Diff; - type Storage = OrdValBatch; + type Storage = OrdValBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &self.cursor.child.key(&storage.layer.vals) } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &self.cursor.child.key(&storage.layer.vals) } + fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { self.cursor.child.child.rewind(&storage.layer.vals.vals); while self.cursor.child.child.valid(&storage.layer.vals.vals) { logic(&self.cursor.child.child.key(&storage.layer.vals.vals).0, &self.cursor.child.child.key(&storage.layer.vals.vals).1); @@ -399,57 +413,39 @@ where fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.child.valid(&storage.layer.vals) } fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); } + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); } fn step_val(&mut self, storage: &Self::Storage) { self.cursor.child.step(&storage.layer.vals); } - fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.child.seek(&storage.layer.vals, val); } + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.child.seek(&storage.layer.vals, val); } fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); } fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.child.rewind(&storage.layer.vals); } } - /// A builder for creating layers from unsorted update tuples. -pub struct OrdValBuilder, CV=Vec> -where - K: Ord+Clone, - V: Ord+Clone, - T: Ord+Clone+Lattice, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - builder: OrderedBuilder, O, CV>, O, CK>, +pub struct OrdValBuilder { + builder: KVTDBuilder, } -impl Builder> for OrdValBuilder -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ + +impl Builder> for OrdValBuilder { fn new() -> Self { OrdValBuilder { - builder: OrderedBuilder::, O, CV>, O, CK>::new() + builder: >::new() } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: , O, CV>, O, CK> as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap) } } #[inline] - fn push(&mut self, (key, val, time, diff): (K, V, T, R)) { + fn push(&mut self, (key, val, time, diff): (::Key, ::Val, ::Time, ::Diff)) { self.builder.push_tuple((key, (val, (time, diff)))); } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { + fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since) @@ -461,35 +457,21 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. -#[derive(Debug, Abomonation)] -pub struct OrdKeyBatch> -where - K: Ord+Clone, - T: Clone+Lattice, - R: Clone, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +#[derive(Abomonation)] +pub struct OrdKeyBatch { /// Where all the dataz is. - pub layer: OrderedLayer, O, CK>, + pub layer: KTDLayer, /// Description of the update times this layer represents. - pub desc: Description, + pub desc: Description<::Time>, } -impl BatchReader for OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - type Key = K; +impl BatchReader for OrdKeyBatch { + type Key = ::Key; type Val = (); - type Time = T; - type R = R; + type Time = ::Time; + type R = ::Diff; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -497,39 +479,25 @@ where phantom: PhantomData } } - fn len(&self) -> usize { , O, CK> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } + fn len(&self) -> usize { as Trie>::tuples(&self.layer) } + fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +impl Batch for OrdKeyBatch { type Batcher = MergeBatcher; - type Builder = OrdKeyBuilder; - type Merger = OrdKeyMerger; + type Builder = OrdKeyBuilder; + type Merger = OrdKeyMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } } -impl OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - fn advance_builder_from(layer: &mut OrderedBuilder, O, CK>, frontier: AntichainRef, key_pos: usize) { +impl OrdKeyBatch { + fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; - let time_start: usize = layer.offs[key_pos].try_into().unwrap(); + let time_start: usize = layer.offs[key_pos].try_into().ok().unwrap(); // We will zip through the time leaves, calling advance on each, // then zip through the value layer, sorting and collapsing each, @@ -552,10 +520,10 @@ where // we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s // initial value. - let lower: usize = layer.offs[i].try_into().unwrap(); - let upper: usize = layer.offs[i+1].try_into().unwrap(); + let lower: usize = layer.offs[i].try_into().ok().unwrap(); + let upper: usize = layer.offs[i+1].try_into().ok().unwrap(); - layer.offs[i] = O::try_from(write_position).unwrap(); + layer.offs[i] = L::KeyOffset::try_from(write_position).ok().unwrap(); let updates = &mut layer.vals.vals[..]; @@ -568,14 +536,14 @@ where } } layer.vals.vals.truncate(write_position); - layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap(); + layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap(); // 4. Remove empty keys. let offs = &mut layer.offs; let mut write_position = key_start; layer.keys.retain_from(key_start, |index, _item| { - let lower = offs[index].try_into().unwrap(); - let upper = offs[index+1].try_into().unwrap(); + let lower = offs[index].try_into().ok().unwrap(); + let upper = offs[index+1].try_into().ok().unwrap(); if lower < upper { offs[write_position+1] = offs[index+1]; write_position += 1; @@ -589,14 +557,7 @@ where } /// State for an in-progress merge. -pub struct OrdKeyMerger> -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +pub struct OrdKeyMerger { // first batch, and position therein. lower1: usize, upper1: usize, @@ -604,20 +565,13 @@ where lower2: usize, upper2: usize, // result that we are currently assembling. - result: , O, CK> as Trie>::MergeBuilder, - description: Description, + result: as Trie>::MergeBuilder, + description: Description<::Time>, should_compact: bool, } -impl Merger> for OrdKeyMerger -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { +impl Merger> for OrdKeyMerger { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -633,12 +587,12 @@ where upper1: batch1.layer.keys(), lower2: 0, upper2: batch2.layer.keys(), - result: <, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), + result: < as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -648,7 +602,7 @@ where desc: self.description, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -693,7 +647,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -707,30 +661,23 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor> { +pub struct OrdKeyCursor { valid: bool, - cursor: OrderedCursor>, - phantom: PhantomData<(K, O, CK)>, + cursor: OrderedCursor::Time, ::Diff>>, + phantom: PhantomData, } -impl Cursor for OrdKeyCursor -where - K: Ord+Clone, - T: Lattice+Ord+Clone, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - type Key = K; +impl Cursor for OrdKeyCursor { + type Key = ::Key; type Val = (); - type Time = T; - type R = R; + type Time = ::Time; + type R = ::Diff; - type Storage = OrdKeyBatch; + type Storage = OrdKeyBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { self.cursor.child.rewind(&storage.layer.vals); while self.cursor.child.valid(&storage.layer.vals) { logic(&self.cursor.child.key(&storage.layer.vals).0, &self.cursor.child.key(&storage.layer.vals).1); @@ -740,7 +687,7 @@ where fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid } fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); self.valid = true; } + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; } fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; } fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { } fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; } @@ -749,45 +696,31 @@ where /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder> -where - K: Ord+Clone, - T: Ord+Clone+Lattice, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - builder: OrderedBuilder, O, CK>, +pub struct OrdKeyBuilder { + builder: KTDBuilder, } -impl Builder> for OrdKeyBuilder -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +impl Builder> for OrdKeyBuilder { fn new() -> Self { OrdKeyBuilder { - builder: OrderedBuilder::, O, CK>::new() + builder: >::new() } } fn with_capacity(cap: usize) -> Self { OrdKeyBuilder { - builder: , O, CK> as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap) } } #[inline] - fn push(&mut self, (key, _, time, diff): (K, (), T, R)) { + fn push(&mut self, (key, _, time, diff): (::Key, (), ::Time, ::Diff)) { self.builder.push_tuple((key, (time, diff))); } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { + fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since) diff --git a/src/trace/layers/ordered.rs b/src/trace/layers/ordered.rs index e2d34a8bd..87f4e5352 100644 --- a/src/trace/layers/ordered.rs +++ b/src/trace/layers/ordered.rs @@ -24,7 +24,7 @@ pub struct OrderedLayer> where K: Ord, C: BatchContainer+Deref, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { /// The keys of the layer. pub keys: C, @@ -39,10 +39,10 @@ where impl Trie for OrderedLayer where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: Trie, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Item = (K, L::Item); type Cursor = OrderedCursor; @@ -59,7 +59,7 @@ where let child_upper = self.offs[lower + 1]; OrderedCursor { bounds: (lower, upper), - child: self.vals.cursor_from(child_lower.try_into().unwrap(), child_upper.try_into().unwrap()), + child: self.vals.cursor_from(child_lower.try_into().ok().unwrap(), child_upper.try_into().ok().unwrap()), pos: lower, } } @@ -76,9 +76,9 @@ where /// Assembles a layer of this pub struct OrderedBuilder> where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { /// Keys pub keys: C, @@ -90,19 +90,19 @@ where impl Builder for OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: Builder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Trie = OrderedLayer; fn boundary(&mut self) -> usize { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).unwrap(); + self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); self.keys.len() } fn done(mut self) -> Self::Trie { - if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().unwrap() == 0 { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).unwrap(); + if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().ok().unwrap() == 0 { + self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); } OrderedLayer { keys: self.keys, @@ -114,14 +114,14 @@ where impl MergeBuilder for OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: MergeBuilder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { fn with_capacity(other1: &Self::Trie, other2: &Self::Trie) -> Self { let mut offs = Vec::with_capacity(other1.keys() + other2.keys() + 1); - offs.push(O::try_from(0 as usize).unwrap()); + offs.push(O::try_from(0 as usize).ok().unwrap()); OrderedBuilder { keys: C::merge_capacity(&other1.keys, &other2.keys), offs: offs, @@ -132,13 +132,13 @@ where fn copy_range(&mut self, other: &Self::Trie, lower: usize, upper: usize) { debug_assert!(lower < upper); let other_basis = other.offs[lower]; - let self_basis = self.offs.last().map(|&x| x).unwrap_or(O::try_from(0).unwrap()); + let self_basis = self.offs.last().map(|&x| x).unwrap_or(O::try_from(0).ok().unwrap()); self.keys.copy_slice(&other.keys[lower .. upper]); for index in lower .. upper { self.offs.push((other.offs[index + 1] + self_basis) - other_basis); } - self.vals.copy_range(&other.vals, other_basis.try_into().unwrap(), other.offs[upper].try_into().unwrap()); + self.vals.copy_range(&other.vals, other_basis.try_into().ok().unwrap(), other.offs[upper].try_into().ok().unwrap()); } fn push_merge(&mut self, other1: (&Self::Trie, usize, usize), other2: (&Self::Trie, usize, usize)) -> usize { @@ -161,10 +161,10 @@ where impl OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: MergeBuilder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { /// Performs one step of merging. #[inline] @@ -185,12 +185,12 @@ where let lower = self.vals.boundary(); // record vals_length so we can tell if anything was pushed. let upper = self.vals.push_merge( - (&trie1.vals, trie1.offs[*lower1].try_into().unwrap(), trie1.offs[*lower1 + 1].try_into().unwrap()), - (&trie2.vals, trie2.offs[*lower2].try_into().unwrap(), trie2.offs[*lower2 + 1].try_into().unwrap()) + (&trie1.vals, trie1.offs[*lower1].try_into().ok().unwrap(), trie1.offs[*lower1 + 1].try_into().ok().unwrap()), + (&trie2.vals, trie2.offs[*lower2].try_into().ok().unwrap(), trie2.offs[*lower2 + 1].try_into().ok().unwrap()) ); if upper > lower { self.keys.copy(&trie1.keys[*lower1]); - self.offs.push(O::try_from(upper).unwrap()); + self.offs.push(O::try_from(upper).ok().unwrap()); } *lower1 += 1; @@ -209,16 +209,16 @@ where impl TupleBuilder for OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: TupleBuilder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Item = (K, L::Item); - fn new() -> Self { OrderedBuilder { keys: C::default(), offs: vec![O::try_from(0).unwrap()], vals: L::new() } } + fn new() -> Self { OrderedBuilder { keys: C::default(), offs: vec![O::try_from(0).ok().unwrap()], vals: L::new() } } fn with_capacity(cap: usize) -> Self { let mut offs = Vec::with_capacity(cap + 1); - offs.push(O::try_from(0).unwrap()); + offs.push(O::try_from(0).ok().unwrap()); OrderedBuilder{ keys: C::with_capacity(cap), offs: offs, @@ -229,12 +229,12 @@ where fn push_tuple(&mut self, (key, val): (K, L::Item)) { // if first element, prior element finish, or different element, need to push and maybe punctuate. - if self.keys.len() == 0 || self.offs[self.keys.len()].try_into().unwrap() != 0 || self.keys[self.keys.len()-1] != key { - if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().unwrap() == 0 { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).unwrap(); + if self.keys.len() == 0 || self.offs[self.keys.len()].try_into().ok().unwrap() != 0 || self.keys[self.keys.len()-1] != key { + if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().ok().unwrap() == 0 { + self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); } self.keys.push(key); - self.offs.push(O::try_from(0).unwrap()); // <-- indicates "unfinished". + self.offs.push(O::try_from(0).ok().unwrap()); // <-- indicates "unfinished". } self.vals.push_tuple(val); } @@ -251,17 +251,17 @@ pub struct OrderedCursor { impl Cursor> for OrderedCursor where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: Trie, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Key = K; fn key<'a>(&self, storage: &'a OrderedLayer) -> &'a Self::Key { &storage.keys[self.pos] } fn step(&mut self, storage: &OrderedLayer) { self.pos += 1; if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } else { self.pos = self.bounds.1; @@ -270,7 +270,7 @@ where fn seek(&mut self, storage: &OrderedLayer, key: &Self::Key) { self.pos += advance(&storage.keys[self.pos .. self.bounds.1], |k| k.lt(key)); if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } } // fn size(&self) -> usize { self.bounds.1 - self.bounds.0 } @@ -278,14 +278,14 @@ where fn rewind(&mut self, storage: &OrderedLayer) { self.pos = self.bounds.0; if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } } fn reposition(&mut self, storage: &OrderedLayer, lower: usize, upper: usize) { self.pos = lower; self.bounds = (lower, upper); if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } } } diff --git a/tests/trace.rs b/tests/trace.rs index d00c4497e..1d830e094 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -6,16 +6,16 @@ use std::rc::Rc; use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; -use differential_dataflow::trace::implementations::ord::OrdValBatch; +use differential_dataflow::trace::implementations::ord::{OrdValBatch, Vector}; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::spine_fueled::Spine; -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>> { +fn get_trace() -> Spine>>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); {