From 7a755e048befb1f5a56eae58c476247c9712a821 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 23 Nov 2023 12:30:54 -0500 Subject: [PATCH 1/2] Relocate BatchContainer, and experiment with ToOwned --- src/trace/implementations/mod.rs | 238 ++++++++++++++++++++++++++- src/trace/implementations/ord.rs | 4 +- src/trace/implementations/ord_neu.rs | 2 +- src/trace/layers/mod.rs | 151 ----------------- src/trace/layers/ordered.rs | 15 +- src/trace/layers/ordered_leaf.rs | 7 +- 6 files changed, 254 insertions(+), 163 deletions(-) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 59937caf0..e947ad473 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -55,7 +55,6 @@ pub use self::ord::OrdKeySpine as KeySpine; use timely::container::columnation::{Columnation, TimelyStack}; use lattice::Lattice; use difference::Semigroup; -use trace::layers::BatchContainer; use trace::layers::ordered::OrdOffset; /// A type that names constituent update types. @@ -167,3 +166,240 @@ impl RetainFrom for TimelyStack { }) } } + +pub use self::containers::{BatchContainer, SliceContainer}; + +/// Containers for data that resemble `Vec`, with leaner implementations. +pub mod containers { + + use timely::container::columnation::{Columnation, TimelyStack}; + + use std::borrow::{Borrow, ToOwned}; + + /// A general-purpose container resembling `Vec`. + pub trait BatchContainer: Default { + /// The type of contained item. + /// + /// The container only supplies references to the item, so it needn't be sized. + type Item: ?Sized; + /// Inserts an owned item. + fn push(&mut self, item: ::Owned) where Self::Item: ToOwned; + /// Inserts a borrowed item. + fn copy(&mut self, item: &Self::Item); + /// Extends from a slice of items. + fn copy_slice(&mut self, slice: &[::Owned]) where Self::Item: ToOwned; + /// Extends from a range of items in another`Self`. + fn copy_range(&mut self, other: &Self, start: usize, end: usize); + /// Creates a new container with sufficient capacity. + fn with_capacity(size: usize) -> Self; + /// Reserves additional capacity. + fn reserve(&mut self, additional: usize); + /// Creates a new container with sufficient capacity. + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self; + + /// Reference to the element at this position. + fn index(&self, index: usize) -> &Self::Item; + /// Number of contained elements + fn len(&self) -> usize; + /// Returns the last item if the container is non-empty. + fn last(&self) -> Option<&Self::Item> { + if self.len() > 0 { + Some(self.index(self.len()-1)) + } + else { + None + } + } + + /// Reports the number of elements satisfing the predicate. + /// + /// This methods *relies strongly* on the assumption that the predicate + /// stays false once it becomes false, a joint property of the predicate + /// and the layout of `Self. This allows `advance` to use exponential search to + /// count the number of elements in time logarithmic in the result. + fn advancebool>(&self, start: usize, end: usize, function: F) -> usize { + + let small_limit = 8; + + // Exponential seach if the answer isn't within `small_limit`. + if end > start + small_limit && function(self.index(start + small_limit)) { + + // start with no advance + let mut index = small_limit + 1; + if start + index < end && function(self.index(start + index)) { + + // advance in exponentially growing steps. + let mut step = 1; + while start + index + step < end && function(self.index(start + index + step)) { + index += step; + step = step << 1; + } + + // advance in exponentially shrinking steps. + step = step >> 1; + while step > 0 { + if start + index + step < end && function(self.index(start + index + step)) { + index += step; + } + step = step >> 1; + } + + index += 1; + } + + index + } + else { + let limit = std::cmp::min(end, start + small_limit); + (start .. limit).filter(|x| function(self.index(*x))).count() + } + } + } + + // All `T: Clone` also implement `ToOwned`, but without the constraint Rust + // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). + impl> BatchContainer for Vec { + type Item = T; + fn push(&mut self, item: T) { + self.push(item); + } + fn copy(&mut self, item: &T) { + self.push(item.clone()); + } + fn copy_slice(&mut self, slice: &[T]) where T: Sized { + self.extend_from_slice(slice); + } + fn copy_range(&mut self, other: &Self, start: usize, end: usize) { + self.extend_from_slice(&other[start .. end]); + } + fn with_capacity(size: usize) -> Self { + Vec::with_capacity(size) + } + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Vec::with_capacity(cont1.len() + cont2.len()) + } + fn index(&self, index: usize) -> &Self::Item { + &self[index] + } + fn len(&self) -> usize { + self[..].len() + } + } + + // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now + // be presented with the actual contained type, rather than a type that borrows into it. + impl> BatchContainer for TimelyStack { + type Item = T; + fn push(&mut self, item: ::Owned) where Self::Item: ToOwned { + self.copy(item.borrow()); + } + fn copy(&mut self, item: &T) { + self.copy(item); + } + fn copy_slice(&mut self, slice: &[::Owned]) where Self::Item: ToOwned { + self.reserve_items(slice.iter()); + for item in slice.iter() { + self.copy(item); + } + } + fn copy_range(&mut self, other: &Self, start: usize, end: usize) { + let slice = &other[start .. end]; + self.reserve_items(slice.iter()); + for item in slice.iter() { + self.copy(item); + } + } + fn with_capacity(size: usize) -> Self { + Self::with_capacity(size) + } + fn reserve(&mut self, _additional: usize) { + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + let mut new = Self::default(); + new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); + new + } + fn index(&self, index: usize) -> &Self::Item { + &self[index] + } + fn len(&self) -> usize { + self[..].len() + } + } + + /// A container that accepts slices `[B::Item]`. + pub struct SliceContainer { + /// Offsets that bound each contained slice. + /// + /// The length will be one greater than the number of contained slices, + /// starting with zero and ending with `self.inner.len()`. + pub offsets: Vec, + /// An inner container for sequences of `B` that dereferences to a slice. + pub inner: Vec, + } + + impl BatchContainer for SliceContainer + where + B: Clone + Sized, + [B]: ToOwned>, + { + type Item = [B]; + fn push(&mut self, item: Vec) where Self::Item: ToOwned { + for x in item.into_iter() { + self.inner.push(x); + } + self.offsets.push(self.inner.len()); + } + fn copy(&mut self, item: &Self::Item) { + for x in item.iter() { + self.inner.copy(x); + } + self.offsets.push(self.inner.len()); + } + fn copy_slice(&mut self, slice: &[Vec]) where Self::Item: ToOwned { + for item in slice { + self.copy(item); + } + } + fn copy_range(&mut self, other: &Self, start: usize, end: usize) { + for index in start .. end { + self.copy(other.index(index)); + } + } + fn with_capacity(size: usize) -> Self { + Self { + offsets: Vec::with_capacity(size), + inner: Vec::with_capacity(size), + } + } + fn reserve(&mut self, _additional: usize) { + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Self { + offsets: Vec::with_capacity(cont1.offsets.len() + cont2.offsets.len()), + inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), + } + } + fn index(&self, index: usize) -> &Self::Item { + let lower = self.offsets[index]; + let upper = self.offsets[index+1]; + &self.inner[lower .. upper] + } + fn len(&self) -> usize { + self.offsets.len() - 1 + } + } + + /// Default implementation introduces a first offset. + impl Default for SliceContainer { + fn default() -> Self { + Self { + offsets: vec![0], + inner: Default::default(), + } + } + } +} diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 79574c746..704b9213b 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -16,7 +16,7 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use lattice::Lattice; -use trace::layers::{Trie, TupleBuilder, BatchContainer}; +use trace::layers::{Trie, TupleBuilder}; use trace::layers::Builder as TrieBuilder; use trace::layers::Cursor as TrieCursor; use trace::layers::ordered::{OrderedLayer, OrderedBuilder, OrderedCursor}; @@ -32,7 +32,7 @@ use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; -use trace::implementations::RetainFrom; +use trace::implementations::{BatchContainer, RetainFrom}; use super::{Update, Layout, Vector, TStack}; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index df22f714b..173d44063 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -44,7 +44,7 @@ mod val_batch { use timely::progress::{Antichain, frontier::AntichainRef}; use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use trace::layers::BatchContainer; + use trace::implementations::BatchContainer; use super::{Layout, Update}; diff --git a/src/trace/layers/mod.rs b/src/trace/layers/mod.rs index aa214e1d0..ff023c621 100644 --- a/src/trace/layers/mod.rs +++ b/src/trace/layers/mod.rs @@ -4,9 +4,6 @@ //! in the next layer. Similarly, ranges of elements in the layer itself may correspond //! to single elements in the layer above. -use timely::container::columnation::TimelyStack; -use timely::container::columnation::Columnation; - pub mod ordered; pub mod ordered_leaf; // pub mod hashed; @@ -108,151 +105,3 @@ pub trait Cursor { /// Repositions the cursor to a different range of values. fn reposition(&mut self, storage: &Storage, lower: usize, upper: usize); } - -/// A general-purpose container resembling `Vec`. -pub trait BatchContainer: Default { - /// The type of contained item. - type Item; - /// Inserts an owned item. - fn push(&mut self, item: Self::Item); - /// Inserts a borrowed item. - fn copy(&mut self, item: &Self::Item); - /// Extends from a slice of items. - fn copy_slice(&mut self, slice: &[Self::Item]); - /// Extends from a range of items in another`Self`. - fn copy_range(&mut self, other: &Self, start: usize, end: usize); - /// Creates a new container with sufficient capacity. - fn with_capacity(size: usize) -> Self; - /// Reserves additional capacity. - fn reserve(&mut self, additional: usize); - /// Creates a new container with sufficient capacity. - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self; - - /// Reference to the element at this position. - fn index(&self, index: usize) -> &Self::Item; - /// Number of contained elements - fn len(&self) -> usize; - /// Returns the last item if the container is non-empty. - fn last(&self) -> Option<&Self::Item> { - if self.len() > 0 { - Some(self.index(self.len()-1)) - } - else { - None - } - } - - /// Reports the number of elements satisfing the predicate. - /// - /// This methods *relies strongly* on the assumption that the predicate - /// stays false once it becomes false, a joint property of the predicate - /// and the layout of `Self. This allows `advance` to use exponential search to - /// count the number of elements in time logarithmic in the result. - fn advancebool>(&self, start: usize, end: usize, function: F) -> usize { - - let small_limit = 8; - - // Exponential seach if the answer isn't within `small_limit`. - if end > start + small_limit && function(self.index(start + small_limit)) { - - // start with no advance - let mut index = small_limit + 1; - if start + index < end && function(self.index(start + index)) { - - // advance in exponentially growing steps. - let mut step = 1; - while start + index + step < end && function(self.index(start + index + step)) { - index += step; - step = step << 1; - } - - // advance in exponentially shrinking steps. - step = step >> 1; - while step > 0 { - if start + index + step < end && function(self.index(start + index + step)) { - index += step; - } - step = step >> 1; - } - - index += 1; - } - - index - } - else { - let limit = std::cmp::min(end, start + small_limit); - (start .. limit).filter(|x| function(self.index(*x))).count() - } - } -} - -impl BatchContainer for Vec { - type Item = T; - fn push(&mut self, item: T) { - self.push(item); - } - fn copy(&mut self, item: &T) { - self.push(item.clone()); - } - fn copy_slice(&mut self, slice: &[T]) { - self.extend_from_slice(slice); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - self.extend_from_slice(&other[start .. end]); - } - fn with_capacity(size: usize) -> Self { - Vec::with_capacity(size) - } - fn reserve(&mut self, additional: usize) { - self.reserve(additional); - } - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - Vec::with_capacity(cont1.len() + cont2.len()) - } - fn index(&self, index: usize) -> &Self::Item { - &self[index] - } - fn len(&self) -> usize { - self[..].len() - } -} - -impl BatchContainer for TimelyStack { - type Item = T; - fn push(&mut self, item: T) { - self.copy(&item); - } - fn copy(&mut self, item: &T) { - self.copy(item); - } - fn copy_slice(&mut self, slice: &[T]) { - self.reserve_items(slice.iter()); - for item in slice.iter() { - self.copy(item); - } - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - let slice = &other[start .. end]; - self.reserve_items(slice.iter()); - for item in slice.iter() { - self.copy(item); - } - } - fn with_capacity(size: usize) -> Self { - Self::with_capacity(size) - } - fn reserve(&mut self, _additional: usize) { - } - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - let mut new = Self::default(); - new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); - new - } - fn index(&self, index: usize) -> &Self::Item { - &self[index] - } - fn len(&self) -> usize { - self[..].len() - } -} diff --git a/src/trace/layers/ordered.rs b/src/trace/layers/ordered.rs index dd0cb6442..6313c00ff 100644 --- a/src/trace/layers/ordered.rs +++ b/src/trace/layers/ordered.rs @@ -1,10 +1,13 @@ //! Implementation using ordered keys and exponential search. -use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder, BatchContainer}; use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; use std::ops::{Sub,Add}; +use trace::implementations::BatchContainer; + +use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder}; + /// Trait for types used as offsets into an ordered layer. /// This is usually `usize`, but `u32` can also be used in applications /// where huge batches do not occur to reduce metadata size. @@ -39,7 +42,7 @@ where impl Trie for OrderedLayer where - K: Ord, + K: Ord+Clone, C: BatchContainer, L: Trie, O: OrdOffset @@ -90,7 +93,7 @@ where impl Builder for OrderedBuilder where - K: Ord, + K: Ord+Clone, C: BatchContainer, L: Builder, O: OrdOffset @@ -114,7 +117,7 @@ where impl MergeBuilder for OrderedBuilder where - K: Ord, + K: Ord+Clone, C: BatchContainer, L: MergeBuilder, O: OrdOffset @@ -161,7 +164,7 @@ where impl OrderedBuilder where - K: Ord, + K: Ord+Clone, C: BatchContainer, L: MergeBuilder, O: OrdOffset @@ -209,7 +212,7 @@ where impl TupleBuilder for OrderedBuilder where - K: Ord, + K: Ord+Clone, C: BatchContainer, L: TupleBuilder, O: OrdOffset diff --git a/src/trace/layers/ordered_leaf.rs b/src/trace/layers/ordered_leaf.rs index 39aec7592..a354caa0a 100644 --- a/src/trace/layers/ordered_leaf.rs +++ b/src/trace/layers/ordered_leaf.rs @@ -1,9 +1,12 @@ //! Implementation using ordered keys and exponential search. +use std::ops::Deref; + use ::difference::Semigroup; +use trace::implementations::BatchContainer; + +use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder}; -use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder, BatchContainer}; -use std::ops::Deref; /// A layer of unordered values. #[derive(Debug, Eq, PartialEq, Clone, Abomonation)] From 584d71341cf41abcdd006ebc3a8d51fc07a6d99c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 23 Nov 2023 12:33:32 -0500 Subject: [PATCH 2/2] Relocate OrdOffset --- src/trace/implementations/mod.rs | 15 ++++++++++++++- src/trace/layers/ordered.rs | 15 +-------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index e947ad473..0135fd701 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -52,10 +52,12 @@ pub mod ord_neu; pub use self::ord::OrdValSpine as ValSpine; pub use self::ord::OrdKeySpine as KeySpine; +use std::ops::{Add, Sub}; +use std::convert::{TryInto, TryFrom}; + use timely::container::columnation::{Columnation, TimelyStack}; use lattice::Lattice; use difference::Semigroup; -use trace::layers::ordered::OrdOffset; /// A type that names constituent update types. pub trait Update { @@ -167,6 +169,17 @@ impl RetainFrom for TimelyStack { } } +/// Trait for types used as offsets into an ordered layer. +/// This is usually `usize`, but `u32` can also be used in applications +/// where huge batches do not occur to reduce metadata size. +pub trait OrdOffset: Copy + PartialEq + Add + Sub + TryFrom + TryInto +{} + +impl OrdOffset for O +where + O: Copy + PartialEq + Add + Sub + TryFrom + TryInto, +{} + pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. diff --git a/src/trace/layers/ordered.rs b/src/trace/layers/ordered.rs index 6313c00ff..2b59bb5ba 100644 --- a/src/trace/layers/ordered.rs +++ b/src/trace/layers/ordered.rs @@ -1,24 +1,11 @@ //! Implementation using ordered keys and exponential search. -use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; -use std::ops::{Sub,Add}; -use trace::implementations::BatchContainer; +use trace::implementations::{BatchContainer, OrdOffset}; use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder}; -/// Trait for types used as offsets into an ordered layer. -/// This is usually `usize`, but `u32` can also be used in applications -/// where huge batches do not occur to reduce metadata size. -pub trait OrdOffset: Copy + PartialEq + Add + Sub + TryFrom + TryInto -{} - -impl OrdOffset for O -where - O: Copy + PartialEq + Add + Sub + TryFrom + TryInto, -{} - /// A level of the trie, with keys and offsets into a lower layer. /// /// In this representation, the values for `keys[i]` are found at `vals[offs[i] .. offs[i+1]]`.