From e7ee3597bbfcd7e972a501d0eb92679665c1dee7 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 11 Nov 2023 04:41:42 -0500 Subject: [PATCH 1/6] Organize generic arguments as layouts --- src/trace/implementations/ord.rs | 310 +++++++++++++++---------------- src/trace/layers/ordered.rs | 56 +++--- 2 files changed, 182 insertions(+), 184 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 5c995e048..a8cea33f6 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -37,11 +37,91 @@ use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; +/// A type that names constituent update types. +pub trait Update { + /// Key by which data are grouped. + type Key: Ord+Clone; + /// Values associated with the key. + type Val: Ord+Clone; + /// Time at which updates occur. + type Time: Ord+Lattice+timely::progress::Timestamp+Clone; + /// Way in which updates occur. + type Diff: Semigroup+Clone; +} + +impl Update for ((K, V), T, R) +where + K: Ord+Clone, + V: Ord+Clone, + T: Ord+Lattice+timely::progress::Timestamp+Clone, + R: Semigroup+Clone, +{ + type Key = K; + type Val = V; + type Time = T; + type Diff = R; +} + +/// A type with opinions on how updates should be laid out. +pub trait Layout { + /// The represented update. + type Target: Update; + /// Offsets to use from keys into vals. + type KeyOffset: OrdOffset; + /// Offsets to use from vals into updates. + type ValOffset: OrdOffset; + /// Container for updates themselves. + type UpdateContainer: BatchContainer+Deref+RetainFrom; + /// Container for update keys. + type KeyContainer: + BatchContainer::Key> + +Deref::Key]> + +RetainFrom<::Key>; + /// Container for update vals. + type ValContainer: + BatchContainer::Val> + +Deref::Val]> + +RetainFrom<::Val>; +} + +/// A layout that uses vectors +pub struct Vector { + phantom: std::marker::PhantomData<(U, O)>, +} + +impl Layout for Vector { + type Target = U; + type KeyOffset = O; + type ValOffset = O; + type UpdateContainer = Vec; + type KeyContainer = Vec; + type ValContainer = Vec; +} + +/// A layout based on timely stacks +pub struct TStack { + phantom: std::marker::PhantomData<(U, O)>, +} + +impl Layout for TStack +where + U::Key: Columnation, + U::Val: Columnation, +{ + type Target = U; + type KeyOffset = O; + type ValOffset = O; + type UpdateContainer = Vec; + type KeyContainer = TimelyStack; + type ValContainer = TimelyStack; +} + + /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec>>>; +pub type OrdValSpineAbom = Spine>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. pub type OrdKeySpine = Spine>>; @@ -50,7 +130,7 @@ pub type OrdKeySpine = Spine>>; pub type OrdKeySpineAbom = Spine, Vec>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack>>>; +pub type ColValSpine = Spine>>>; /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; @@ -86,78 +166,50 @@ impl RetainFrom for TimelyStack { } /// An immutable collection of update tuples, from a contiguous interval of logical times. -#[derive(Debug, Abomonation)] -pub struct OrdValBatch, CV=Vec> -where - K: Ord+Clone, - V: Ord+Clone, - T: Clone+Lattice, - R: Clone, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ +#[derive(Abomonation)] +pub struct OrdValBatch { /// Where all the dataz is. - pub layer: OrderedLayer, O, CV>, O, CK>, + pub layer: KVTDLayer, /// Description of the update times this layer represents. - pub desc: Description, + pub desc: Description<::Time>, } -impl BatchReader for OrdValBatch -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - type Key = K; - type Val = V; - type Time = T; - type R = R; - - type Cursor = OrdValCursor; +// Type aliases to make certain types readable. +type TDLayer = OrderedLeaf<<::Target as Update>::Time, <::Target as Update>::Diff>; +type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValOffset, ::ValContainer>; +type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyOffset, ::KeyContainer>; +type TDBuilder = OrderedLeafBuilder<<::Target as Update>::Time, <::Target as Update>::Diff>; +type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValOffset, ::ValContainer>; +type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; + +impl BatchReader for OrdValBatch { + type Key = ::Key; + type Val = ::Val; + type Time = ::Time; + type R = ::Diff; + + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } - fn len(&self) -> usize { , O, CV>, O, CK> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } + fn len(&self) -> usize { as Trie>::tuples(&self.layer) } + fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdValBatch -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ +impl Batch for OrdValBatch { type Batcher = MergeBatcher; - type Builder = OrdValBuilder; - type Merger = OrdValMerger; + type Builder = OrdValBuilder; + type Merger = OrdValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } } -impl OrdValBatch -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - fn advance_builder_from(layer: &mut OrderedBuilder, O, CV>, O, CK>, frontier: AntichainRef, key_pos: usize) { +impl OrdValBatch { + fn advance_builder_from(layer: &mut KVTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; - let val_start: usize = layer.offs[key_pos].try_into().unwrap(); - let time_start: usize = layer.vals.offs[val_start].try_into().unwrap(); + let val_start: usize = layer.offs[key_pos].try_into().ok().unwrap(); + let time_start: usize = layer.vals.offs[val_start].try_into().ok().unwrap(); // We have unique ownership of the batch, and can advance times in place. // We must still sort, collapse, and remove empty updates. @@ -180,10 +232,10 @@ where // we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s // initial value. - let lower: usize = layer.vals.offs[i].try_into().unwrap(); - let upper: usize = layer.vals.offs[i+1].try_into().unwrap(); + let lower: usize = layer.vals.offs[i].try_into().ok().unwrap(); + let upper: usize = layer.vals.offs[i+1].try_into().ok().unwrap(); - layer.vals.offs[i] = O::try_from(write_position).unwrap(); + layer.vals.offs[i] = L::ValOffset::try_from(write_position).ok().unwrap(); let updates = &mut layer.vals.vals.vals[..]; @@ -196,7 +248,7 @@ where } } layer.vals.vals.vals.truncate(write_position); - layer.vals.offs[layer.vals.keys.len()] = O::try_from(write_position).unwrap(); + layer.vals.offs[layer.vals.keys.len()] = L::ValOffset::try_from(write_position).ok().unwrap(); // 3. Remove values with empty histories. In addition, we need to update offsets // in `layer.offs` to correctly reference the potentially moved values. @@ -206,12 +258,12 @@ where let keys_off = &mut layer.offs; layer.vals.keys.retain_from(val_start, |index, _item| { // As we pass each key offset, record its new position. - if index == keys_off[keys_pos].try_into().unwrap() { - keys_off[keys_pos] = O::try_from(write_position).unwrap(); + if index == keys_off[keys_pos].try_into().ok().unwrap() { + keys_off[keys_pos] = L::KeyOffset::try_from(write_position).ok().unwrap(); keys_pos += 1; } - let lower = vals_off[index].try_into().unwrap(); - let upper = vals_off[index+1].try_into().unwrap(); + let lower = vals_off[index].try_into().ok().unwrap(); + let upper = vals_off[index+1].try_into().ok().unwrap(); if lower < upper { vals_off[write_position+1] = vals_off[index+1]; write_position += 1; @@ -221,14 +273,14 @@ where }); debug_assert_eq!(write_position, layer.vals.keys.len()); layer.vals.offs.truncate(write_position + 1); - layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap(); + layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap(); // 4. Remove empty keys. let offs = &mut layer.offs; let mut write_position = key_start; layer.keys.retain_from(key_start, |index, _item| { - let lower = offs[index].try_into().unwrap(); - let upper = offs[index+1].try_into().unwrap(); + let lower = offs[index].try_into().ok().unwrap(); + let upper = offs[index+1].try_into().ok().unwrap(); if lower < upper { offs[write_position+1] = offs[index+1]; write_position += 1; @@ -242,16 +294,7 @@ where } /// State for an in-progress merge. -pub struct OrdValMerger, CV=Vec> -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ +pub struct OrdValMerger { // first batch, and position therein. lower1: usize, upper1: usize, @@ -259,22 +302,13 @@ where lower2: usize, upper2: usize, // result that we are currently assembling. - result: , O, CV>, O, CK> as Trie>::MergeBuilder, - description: Description, + result: as Trie>::MergeBuilder, + description: Description<::Time>, should_compact: bool, } -impl Merger> for OrdValMerger -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { +impl Merger> for OrdValMerger { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -290,12 +324,12 @@ where upper1: batch1.layer.keys(), lower2: 0, upper2: batch2.layer.keys(), - result: <, O, CV>, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), + result: < as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -305,7 +339,7 @@ where desc: self.description, } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.vals.len(); let mut effort = 0isize; @@ -344,7 +378,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -356,40 +390,22 @@ where } /// A cursor for navigating a single layer. -#[derive(Debug)] -pub struct OrdValCursor, CV=Vec> -where - V: Ord+Clone, - T: Lattice+Ord+Clone, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - phantom: std::marker::PhantomData<(K, CK, CV)>, - cursor: OrderedCursor, O, CV>>, +pub struct OrdValCursor { + phantom: std::marker::PhantomData, + cursor: OrderedCursor>, } -impl Cursor for OrdValCursor -where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+Ord+Clone, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - type Key = K; - type Val = V; - type Time = T; - type R = R; +impl Cursor for OrdValCursor { + type Key = ::Key; + type Val = ::Val; + type Time = ::Time; + type R = ::Diff; - type Storage = OrdValBatch; + type Storage = OrdValBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &self.cursor.child.key(&storage.layer.vals) } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &self.cursor.child.key(&storage.layer.vals) } + fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { self.cursor.child.child.rewind(&storage.layer.vals.vals); while self.cursor.child.child.valid(&storage.layer.vals.vals) { logic(&self.cursor.child.child.key(&storage.layer.vals.vals).0, &self.cursor.child.child.key(&storage.layer.vals.vals).1); @@ -399,57 +415,39 @@ where fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.child.valid(&storage.layer.vals) } fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); } + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); } fn step_val(&mut self, storage: &Self::Storage) { self.cursor.child.step(&storage.layer.vals); } - fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.child.seek(&storage.layer.vals, val); } + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { self.cursor.child.seek(&storage.layer.vals, val); } fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); } fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.child.rewind(&storage.layer.vals); } } - /// A builder for creating layers from unsorted update tuples. -pub struct OrdValBuilder, CV=Vec> -where - K: Ord+Clone, - V: Ord+Clone, - T: Ord+Clone+Lattice, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ - builder: OrderedBuilder, O, CV>, O, CK>, +pub struct OrdValBuilder { + builder: KVTDBuilder, } -impl Builder> for OrdValBuilder -where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, - CV: BatchContainer+Deref+RetainFrom, -{ + +impl Builder> for OrdValBuilder { fn new() -> Self { OrdValBuilder { - builder: OrderedBuilder::, O, CV>, O, CK>::new() + builder: OrderedBuilder::<::Key, VTDBuilder, L::KeyOffset, L::KeyContainer>::new() } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: , O, CV>, O, CK> as TupleBuilder>::with_capacity(cap) + builder: ::Key, VTDBuilder, L::KeyOffset, L::KeyContainer> as TupleBuilder>::with_capacity(cap) } } #[inline] - fn push(&mut self, (key, val, time, diff): (K, V, T, R)) { + fn push(&mut self, (key, val, time, diff): (::Key, ::Val, ::Time, ::Diff)) { self.builder.push_tuple((key, (val, (time, diff)))); } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { + fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since) diff --git a/src/trace/layers/ordered.rs b/src/trace/layers/ordered.rs index e2d34a8bd..da2c4ef8b 100644 --- a/src/trace/layers/ordered.rs +++ b/src/trace/layers/ordered.rs @@ -24,7 +24,7 @@ pub struct OrderedLayer> where K: Ord, C: BatchContainer+Deref, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { /// The keys of the layer. pub keys: C, @@ -42,7 +42,7 @@ where K: Ord+Clone, C: BatchContainer+Deref, L: Trie, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Item = (K, L::Item); type Cursor = OrderedCursor; @@ -59,7 +59,7 @@ where let child_upper = self.offs[lower + 1]; OrderedCursor { bounds: (lower, upper), - child: self.vals.cursor_from(child_lower.try_into().unwrap(), child_upper.try_into().unwrap()), + child: self.vals.cursor_from(child_lower.try_into().ok().unwrap(), child_upper.try_into().ok().unwrap()), pos: lower, } } @@ -78,7 +78,7 @@ pub struct OrderedBuilder> where K: Ord+Clone, C: BatchContainer+Deref, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { /// Keys pub keys: C, @@ -93,16 +93,16 @@ where K: Ord+Clone, C: BatchContainer+Deref, L: Builder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Trie = OrderedLayer; fn boundary(&mut self) -> usize { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).unwrap(); + self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); self.keys.len() } fn done(mut self) -> Self::Trie { - if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().unwrap() == 0 { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).unwrap(); + if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().ok().unwrap() == 0 { + self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); } OrderedLayer { keys: self.keys, @@ -117,11 +117,11 @@ where K: Ord+Clone, C: BatchContainer+Deref, L: MergeBuilder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { fn with_capacity(other1: &Self::Trie, other2: &Self::Trie) -> Self { let mut offs = Vec::with_capacity(other1.keys() + other2.keys() + 1); - offs.push(O::try_from(0 as usize).unwrap()); + offs.push(O::try_from(0 as usize).ok().unwrap()); OrderedBuilder { keys: C::merge_capacity(&other1.keys, &other2.keys), offs: offs, @@ -132,13 +132,13 @@ where fn copy_range(&mut self, other: &Self::Trie, lower: usize, upper: usize) { debug_assert!(lower < upper); let other_basis = other.offs[lower]; - let self_basis = self.offs.last().map(|&x| x).unwrap_or(O::try_from(0).unwrap()); + let self_basis = self.offs.last().map(|&x| x).unwrap_or(O::try_from(0).ok().unwrap()); self.keys.copy_slice(&other.keys[lower .. upper]); for index in lower .. upper { self.offs.push((other.offs[index + 1] + self_basis) - other_basis); } - self.vals.copy_range(&other.vals, other_basis.try_into().unwrap(), other.offs[upper].try_into().unwrap()); + self.vals.copy_range(&other.vals, other_basis.try_into().ok().unwrap(), other.offs[upper].try_into().ok().unwrap()); } fn push_merge(&mut self, other1: (&Self::Trie, usize, usize), other2: (&Self::Trie, usize, usize)) -> usize { @@ -164,7 +164,7 @@ where K: Ord+Clone, C: BatchContainer+Deref, L: MergeBuilder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { /// Performs one step of merging. #[inline] @@ -185,12 +185,12 @@ where let lower = self.vals.boundary(); // record vals_length so we can tell if anything was pushed. let upper = self.vals.push_merge( - (&trie1.vals, trie1.offs[*lower1].try_into().unwrap(), trie1.offs[*lower1 + 1].try_into().unwrap()), - (&trie2.vals, trie2.offs[*lower2].try_into().unwrap(), trie2.offs[*lower2 + 1].try_into().unwrap()) + (&trie1.vals, trie1.offs[*lower1].try_into().ok().unwrap(), trie1.offs[*lower1 + 1].try_into().ok().unwrap()), + (&trie2.vals, trie2.offs[*lower2].try_into().ok().unwrap(), trie2.offs[*lower2 + 1].try_into().ok().unwrap()) ); if upper > lower { self.keys.copy(&trie1.keys[*lower1]); - self.offs.push(O::try_from(upper).unwrap()); + self.offs.push(O::try_from(upper).ok().unwrap()); } *lower1 += 1; @@ -212,13 +212,13 @@ where K: Ord+Clone, C: BatchContainer+Deref, L: TupleBuilder, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Item = (K, L::Item); - fn new() -> Self { OrderedBuilder { keys: C::default(), offs: vec![O::try_from(0).unwrap()], vals: L::new() } } + fn new() -> Self { OrderedBuilder { keys: C::default(), offs: vec![O::try_from(0).ok().unwrap()], vals: L::new() } } fn with_capacity(cap: usize) -> Self { let mut offs = Vec::with_capacity(cap + 1); - offs.push(O::try_from(0).unwrap()); + offs.push(O::try_from(0).ok().unwrap()); OrderedBuilder{ keys: C::with_capacity(cap), offs: offs, @@ -229,12 +229,12 @@ where fn push_tuple(&mut self, (key, val): (K, L::Item)) { // if first element, prior element finish, or different element, need to push and maybe punctuate. - if self.keys.len() == 0 || self.offs[self.keys.len()].try_into().unwrap() != 0 || self.keys[self.keys.len()-1] != key { - if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().unwrap() == 0 { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).unwrap(); + if self.keys.len() == 0 || self.offs[self.keys.len()].try_into().ok().unwrap() != 0 || self.keys[self.keys.len()-1] != key { + if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().ok().unwrap() == 0 { + self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); } self.keys.push(key); - self.offs.push(O::try_from(0).unwrap()); // <-- indicates "unfinished". + self.offs.push(O::try_from(0).ok().unwrap()); // <-- indicates "unfinished". } self.vals.push_tuple(val); } @@ -254,14 +254,14 @@ where K: Ord+Clone, C: BatchContainer+Deref, L: Trie, - O: OrdOffset, >::Error: Debug, >::Error: Debug + O: OrdOffset { type Key = K; fn key<'a>(&self, storage: &'a OrderedLayer) -> &'a Self::Key { &storage.keys[self.pos] } fn step(&mut self, storage: &OrderedLayer) { self.pos += 1; if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } else { self.pos = self.bounds.1; @@ -270,7 +270,7 @@ where fn seek(&mut self, storage: &OrderedLayer, key: &Self::Key) { self.pos += advance(&storage.keys[self.pos .. self.bounds.1], |k| k.lt(key)); if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } } // fn size(&self) -> usize { self.bounds.1 - self.bounds.0 } @@ -278,14 +278,14 @@ where fn rewind(&mut self, storage: &OrderedLayer) { self.pos = self.bounds.0; if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } } fn reposition(&mut self, storage: &OrderedLayer, lower: usize, upper: usize) { self.pos = lower; self.bounds = (lower, upper); if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().unwrap(), storage.offs[self.pos + 1].try_into().unwrap()); + self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); } } } From 44c7d5ac2f7c8ca0a22d509eb8bd9ede8c61eb1b Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 11 Nov 2023 04:47:58 -0500 Subject: [PATCH 2/6] update tests --- src/trace/implementations/ord.rs | 4 ++-- tests/trace.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index a8cea33f6..e0a85eff2 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -85,7 +85,7 @@ pub trait Layout { } /// A layout that uses vectors -pub struct Vector { +pub struct Vector { phantom: std::marker::PhantomData<(U, O)>, } @@ -99,7 +99,7 @@ impl Layout for Vector { } /// A layout based on timely stacks -pub struct TStack { +pub struct TStack { phantom: std::marker::PhantomData<(U, O)>, } diff --git a/tests/trace.rs b/tests/trace.rs index d00c4497e..1d830e094 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -6,16 +6,16 @@ use std::rc::Rc; use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; -use differential_dataflow::trace::implementations::ord::OrdValBatch; +use differential_dataflow::trace::implementations::ord::{OrdValBatch, Vector}; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::spine_fueled::Spine; -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine>>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>> { +fn get_trace() -> Spine>>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { From a8201fbde5c6b7c63d2b9871ae28680b159049f6 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 11 Nov 2023 04:54:13 -0500 Subject: [PATCH 3/6] Further consolidation --- src/trace/implementations/ord.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index e0a85eff2..678782ccf 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -432,12 +432,12 @@ impl Builder> for OrdValBuilder { fn new() -> Self { OrdValBuilder { - builder: OrderedBuilder::<::Key, VTDBuilder, L::KeyOffset, L::KeyContainer>::new() + builder: >::new() } } fn with_capacity(cap: usize) -> Self { OrdValBuilder { - builder: ::Key, VTDBuilder, L::KeyOffset, L::KeyContainer> as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap) } } From 344e034fb0c036161ce7d4b66946b03c887f79ae Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 11 Nov 2023 09:37:43 -0500 Subject: [PATCH 4/6] Key layouts --- src/trace/implementations/ord.rs | 179 ++++++++++--------------------- 1 file changed, 59 insertions(+), 120 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 678782ccf..c29f3df52 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -118,21 +118,21 @@ where /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>>; +pub type OrdValSpine = Spine>>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine>, Vec>>>; +pub type OrdValSpineAbom = Spine>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>; +pub type OrdKeySpine = Spine>>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec>>>; +pub type OrdKeySpineAbom = Spine>, Vec>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine>>>; +pub type ColValSpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine>>>; /// A container that can retain/discard from some offset onward. @@ -177,9 +177,11 @@ pub struct OrdValBatch { // Type aliases to make certain types readable. type TDLayer = OrderedLeaf<<::Target as Update>::Time, <::Target as Update>::Diff>; type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValOffset, ::ValContainer>; +type KTDLayer = OrderedLayer<<::Target as Update>::Key, TDLayer, ::KeyOffset, ::KeyContainer>; type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyOffset, ::KeyContainer>; type TDBuilder = OrderedLeafBuilder<<::Target as Update>::Time, <::Target as Update>::Diff>; type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValOffset, ::ValContainer>; +type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; impl BatchReader for OrdValBatch { @@ -459,35 +461,21 @@ impl Builder> for OrdValBuilder { /// An immutable collection of update tuples, from a contiguous interval of logical times. -#[derive(Debug, Abomonation)] -pub struct OrdKeyBatch> -where - K: Ord+Clone, - T: Clone+Lattice, - R: Clone, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +#[derive(Abomonation)] +pub struct OrdKeyBatch { /// Where all the dataz is. - pub layer: OrderedLayer, O, CK>, + pub layer: KTDLayer, /// Description of the update times this layer represents. - pub desc: Description, + pub desc: Description<::Time>, } -impl BatchReader for OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - type Key = K; +impl BatchReader for OrdKeyBatch { + type Key = ::Key; type Val = (); - type Time = T; - type R = R; + type Time = ::Time; + type R = ::Diff; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -495,39 +483,25 @@ where phantom: PhantomData } } - fn len(&self) -> usize { , O, CK> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } + fn len(&self) -> usize { as Trie>::tuples(&self.layer) } + fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +impl Batch for OrdKeyBatch { type Batcher = MergeBatcher; - type Builder = OrdKeyBuilder; - type Merger = OrdKeyMerger; + type Builder = OrdKeyBuilder; + type Merger = OrdKeyMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } } -impl OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - fn advance_builder_from(layer: &mut OrderedBuilder, O, CK>, frontier: AntichainRef, key_pos: usize) { +impl OrdKeyBatch { + fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; - let time_start: usize = layer.offs[key_pos].try_into().unwrap(); + let time_start: usize = layer.offs[key_pos].try_into().ok().unwrap(); // We will zip through the time leaves, calling advance on each, // then zip through the value layer, sorting and collapsing each, @@ -550,10 +524,10 @@ where // we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s // initial value. - let lower: usize = layer.offs[i].try_into().unwrap(); - let upper: usize = layer.offs[i+1].try_into().unwrap(); + let lower: usize = layer.offs[i].try_into().ok().unwrap(); + let upper: usize = layer.offs[i+1].try_into().ok().unwrap(); - layer.offs[i] = O::try_from(write_position).unwrap(); + layer.offs[i] = L::KeyOffset::try_from(write_position).ok().unwrap(); let updates = &mut layer.vals.vals[..]; @@ -566,14 +540,14 @@ where } } layer.vals.vals.truncate(write_position); - layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap(); + layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap(); // 4. Remove empty keys. let offs = &mut layer.offs; let mut write_position = key_start; layer.keys.retain_from(key_start, |index, _item| { - let lower = offs[index].try_into().unwrap(); - let upper = offs[index+1].try_into().unwrap(); + let lower = offs[index].try_into().ok().unwrap(); + let upper = offs[index+1].try_into().ok().unwrap(); if lower < upper { offs[write_position+1] = offs[index+1]; write_position += 1; @@ -587,14 +561,7 @@ where } /// State for an in-progress merge. -pub struct OrdKeyMerger> -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +pub struct OrdKeyMerger { // first batch, and position therein. lower1: usize, upper1: usize, @@ -602,20 +569,13 @@ where lower2: usize, upper2: usize, // result that we are currently assembling. - result: , O, CK> as Trie>::MergeBuilder, - description: Description, + result: as Trie>::MergeBuilder, + description: Description<::Time>, should_compact: bool, } -impl Merger> for OrdKeyMerger -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { +impl Merger> for OrdKeyMerger { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -631,12 +591,12 @@ where upper1: batch1.layer.keys(), lower2: 0, upper2: batch2.layer.keys(), - result: <, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), + result: < as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -646,7 +606,7 @@ where desc: self.description, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -691,7 +651,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -705,30 +665,23 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor> { +pub struct OrdKeyCursor { valid: bool, - cursor: OrderedCursor>, - phantom: PhantomData<(K, O, CK)>, + cursor: OrderedCursor::Time, ::Diff>>, + phantom: PhantomData, } -impl Cursor for OrdKeyCursor -where - K: Ord+Clone, - T: Lattice+Ord+Clone, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - type Key = K; +impl Cursor for OrdKeyCursor { + type Key = ::Key; type Val = (); - type Time = T; - type R = R; + type Time = ::Time; + type R = ::Diff; - type Storage = OrdKeyBatch; + type Storage = OrdKeyBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { self.cursor.child.rewind(&storage.layer.vals); while self.cursor.child.valid(&storage.layer.vals) { logic(&self.cursor.child.key(&storage.layer.vals).0, &self.cursor.child.key(&storage.layer.vals).1); @@ -738,7 +691,7 @@ where fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid } fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); self.valid = true; } + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; } fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; } fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { } fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; } @@ -747,45 +700,31 @@ where /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder> -where - K: Ord+Clone, - T: Ord+Clone+Lattice, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - builder: OrderedBuilder, O, CK>, +pub struct OrdKeyBuilder { + builder: KTDBuilder, } -impl Builder> for OrdKeyBuilder -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +impl Builder> for OrdKeyBuilder { fn new() -> Self { OrdKeyBuilder { - builder: OrderedBuilder::, O, CK>::new() + builder: >::new() } } fn with_capacity(cap: usize) -> Self { OrdKeyBuilder { - builder: , O, CK> as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap) } } #[inline] - fn push(&mut self, (key, _, time, diff): (K, (), T, R)) { + fn push(&mut self, (key, _, time, diff): (::Key, (), ::Time, ::Diff)) { self.builder.push_tuple((key, (time, diff))); } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { + fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since) From 6ea9d1da9763ae021cc3def1c6b2bf304149c073 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 11 Nov 2023 09:38:04 -0500 Subject: [PATCH 5/6] Remove spurious bound --- src/trace/layers/ordered.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/trace/layers/ordered.rs b/src/trace/layers/ordered.rs index da2c4ef8b..87f4e5352 100644 --- a/src/trace/layers/ordered.rs +++ b/src/trace/layers/ordered.rs @@ -39,7 +39,7 @@ where impl Trie for OrderedLayer where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: Trie, O: OrdOffset @@ -76,7 +76,7 @@ where /// Assembles a layer of this pub struct OrderedBuilder> where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, O: OrdOffset { @@ -90,7 +90,7 @@ where impl Builder for OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: Builder, O: OrdOffset @@ -114,7 +114,7 @@ where impl MergeBuilder for OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: MergeBuilder, O: OrdOffset @@ -161,7 +161,7 @@ where impl OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: MergeBuilder, O: OrdOffset @@ -209,7 +209,7 @@ where impl TupleBuilder for OrderedBuilder where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: TupleBuilder, O: OrdOffset @@ -251,7 +251,7 @@ pub struct OrderedCursor { impl Cursor> for OrderedCursor where - K: Ord+Clone, + K: Ord, C: BatchContainer+Deref, L: Trie, O: OrdOffset From b5a245ee5a654d1132bc683fe816117e7881dccd Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 12 Nov 2023 15:00:17 -0500 Subject: [PATCH 6/6] remove premature update container --- src/trace/implementations/ord.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index c29f3df52..989c274af 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -70,8 +70,6 @@ pub trait Layout { type KeyOffset: OrdOffset; /// Offsets to use from vals into updates. type ValOffset: OrdOffset; - /// Container for updates themselves. - type UpdateContainer: BatchContainer+Deref+RetainFrom; /// Container for update keys. type KeyContainer: BatchContainer::Key> @@ -93,7 +91,6 @@ impl Layout for Vector { type Target = U; type KeyOffset = O; type ValOffset = O; - type UpdateContainer = Vec; type KeyContainer = Vec; type ValContainer = Vec; } @@ -111,7 +108,6 @@ where type Target = U; type KeyOffset = O; type ValOffset = O; - type UpdateContainer = Vec; type KeyContainer = TimelyStack; type ValContainer = TimelyStack; }