From 56ba5277a47e7609b3370b24388e1d196b8fe32f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 8 Dec 2021 13:30:47 +0100 Subject: [PATCH 1/4] Make collections generic over containers Signed-off-by: Moritz Hoffmann --- dogsdogsdogs/src/lib.rs | 10 +- src/algorithms/prefix_sum.rs | 6 +- src/collection.rs | 240 ++++++------ src/consolidation.rs | 90 +++++ src/lib.rs | 2 + src/operators/arrange/arrangement.rs | 48 +-- src/operators/consolidate.rs | 18 +- src/trace/implementations/merge_batcher.rs | 36 +- .../merge_batcher_columnation.rs | 357 ++++++++++++++++++ src/trace/implementations/mod.rs | 2 + src/trace/implementations/ord.rs | 225 ++++++----- src/trace/layers/mod.rs | 20 +- src/trace/mod.rs | 11 +- tests/trace.rs | 9 +- 14 files changed, 823 insertions(+), 251 deletions(-) create mode 100644 src/trace/implementations/merge_batcher_columnation.rs diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 3e288c3d1..4bcf6313d 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -14,7 +14,7 @@ use timely::progress::Timestamp; use timely::dataflow::operators::Partition; use timely::dataflow::operators::Concatenate; -use differential_dataflow::{ExchangeData, Collection, AsCollection}; +use differential_dataflow::{Data, ExchangeData, Collection, AsCollection}; use differential_dataflow::operators::Threshold; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; @@ -33,9 +33,9 @@ pub mod operators; **/ pub trait PrefixExtender> { /// The required type of prefix to extend. - type Prefix; + type Prefix: Data; /// The type to be produced as extension. - type Extension; + type Extension: Data; /// Annotates prefixes with the number of extensions the relation would propose. fn count(&mut self, prefixes: &Collection, index: usize) -> Collection; /// Extends each prefix with corresponding extensions. @@ -92,11 +92,11 @@ where } } -pub trait ValidateExtensionMethod, P, E> { +pub trait ValidateExtensionMethod, P: Data, E: Data> { fn validate_using>(&self, extender: &mut PE) -> Collection; } -impl, P, E> ValidateExtensionMethod for Collection { +impl, P: Data, E: Data> ValidateExtensionMethod for Collection { fn validate_using>(&self, extender: &mut PE) -> Collection { extender.validate(self) } diff --git a/src/algorithms/prefix_sum.rs b/src/algorithms/prefix_sum.rs index 1b8b00c83..2d6d0f9c8 100644 --- a/src/algorithms/prefix_sum.rs +++ b/src/algorithms/prefix_sum.rs @@ -2,12 +2,12 @@ use timely::dataflow::Scope; -use ::{Collection, ExchangeData}; +use ::{Collection, Data, ExchangeData}; use ::lattice::Lattice; use ::operators::*; /// Extension trait for the prefix_sum method. -pub trait PrefixSum { +pub trait PrefixSum { /// Computes the prefix sum for each element in the collection. /// /// The prefix sum is data-parallel, in the sense that the sums are computed independently for @@ -150,4 +150,4 @@ where .distinct() }) .semijoin(&queries) -} \ No newline at end of file +} diff --git a/src/collection.rs b/src/collection.rs index eb82c5d0d..27631c0b9 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -14,14 +14,15 @@ use timely::Data; use timely::progress::Timestamp; use timely::order::Product; use timely::dataflow::scopes::{Child, child::Iterative}; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, StreamCore}; use timely::dataflow::operators::*; use ::difference::{Semigroup, Abelian, Multiply}; use lattice::Lattice; use hashable::Hashable; +use TimelyContainer; -/// A mutable collection of values of type `D` +/// A mutable collection of values of type `D` within a container `C` /// /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your /// differential dataflow computation, you write as if the collection is a static dataset to which you @@ -30,31 +31,140 @@ use hashable::Hashable; /// propagate changes through your functional computation and report the corresponding changes to the /// output collections. /// -/// Each collection has three generic parameters. The parameter `G` is for the scope in which the +/// Each collection has four generic parameters. The parameter `G` is for the scope in which the /// collection exists; as you write more complicated programs you may wish to introduce nested scopes /// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D` /// parameter is the type of data in your collection, for example `String`, or `(u32, Vec>)`. /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and -/// defaults to) `isize`, representing changes to the occurrence count of each record. +/// defaults to) `isize`, representing changes to the occurrence count of each record. The `C` +/// parameter specifies the container type of the collection. +/// +/// Note that the default container type is `Vec<_>`. #[derive(Clone)] -pub struct Collection { +pub struct Collection::Timestamp, R)>> + where + G: Scope, + R: Semigroup, + C: TimelyContainer, +{ /// The underlying timely dataflow stream. /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is /// not intended to be the idiomatic way to work with the collection. - pub inner: Stream + pub inner: StreamCore } -impl Collection where G::Timestamp: Data { +impl Collection + where + C: TimelyContainer, +{ /// Creates a new Collection from a timely dataflow stream. /// /// This method seems to be rarely used, with the `as_collection` method on streams being a more /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait /// provides a `new_collection` method which will create a new collection for you without exposing /// the underlying timely stream at all. - pub fn new(stream: Stream) -> Collection { - Collection { inner: stream } + pub fn new(stream: StreamCore) -> Collection { + Self { inner: stream } + } + + /// Creates a new collection accumulating the contents of the two collections. + /// + /// Despite the name, differential dataflow collections are unordered. This method is so named because the + /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the + /// two collections. + /// + /// # Examples + /// + /// ``` + /// extern crate timely; + /// extern crate differential_dataflow; + /// + /// use differential_dataflow::input::Input; + /// + /// fn main() { + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.concat(&evens) + /// .assert_eq(&data); + /// }); + /// } + /// ``` + pub fn concat(&self, other: &Collection) -> Collection { + self.inner + .concat(&other.inner) + .as_collection() + } + + /// Creates a new collection accumulating the contents of the two collections. + /// + /// Despite the name, differential dataflow collections are unordered. This method is so named because the + /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the + /// two collections. + /// + /// # Examples + /// + /// ``` + /// extern crate timely; + /// extern crate differential_dataflow; + /// + /// use differential_dataflow::input::Input; + /// + /// fn main() { + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.concatenate(Some(evens)) + /// .assert_eq(&data); + /// }); + /// } + /// ``` + pub fn concatenate(&self, sources: I) -> Collection + where + I: IntoIterator> + { + self.inner + .concatenate(sources.into_iter().map(|x| x.inner)) + .as_collection() + } + + /// Attaches a timely dataflow probe to the output of a Collection. + /// + /// This probe is used to determine when the state of the Collection has stabilized and can + /// be read out. + pub fn probe(&self) -> probe::Handle { + self.inner + .probe() + } + + /// Attaches a timely dataflow probe to the output of a Collection. + /// + /// This probe is used to determine when the state of the Collection has stabilized and all updates observed. + /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a + /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to + /// avoid swamping the system. + pub fn probe_with(&self, handle: &mut probe::Handle) -> Collection { + self.inner + .probe_with(handle) + .as_collection() + } + + /// The scope containing the underlying timely dataflow stream. + pub fn scope(&self) -> G { + self.inner.scope() } +} + +impl Collection where G::Timestamp: Data { /// Creates a new collection by applying the supplied function to each input element. /// /// # Examples @@ -166,73 +276,6 @@ impl Collection where G::Timestamp: Da .filter(move |&(ref data, _, _)| logic(data)) .as_collection() } - /// Creates a new collection accumulating the contents of the two collections. - /// - /// Despite the name, differential dataflow collections are unordered. This method is so named because the - /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the - /// two collections. - /// - /// # Examples - /// - /// ``` - /// extern crate timely; - /// extern crate differential_dataflow; - /// - /// use differential_dataflow::input::Input; - /// - /// fn main() { - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.concat(&evens) - /// .assert_eq(&data); - /// }); - /// } - /// ``` - pub fn concat(&self, other: &Collection) -> Collection { - self.inner - .concat(&other.inner) - .as_collection() - } - /// Creates a new collection accumulating the contents of the two collections. - /// - /// Despite the name, differential dataflow collections are unordered. This method is so named because the - /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the - /// two collections. - /// - /// # Examples - /// - /// ``` - /// extern crate timely; - /// extern crate differential_dataflow; - /// - /// use differential_dataflow::input::Input; - /// - /// fn main() { - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.concatenate(Some(evens)) - /// .assert_eq(&data); - /// }); - /// } - /// ``` - pub fn concatenate(&self, sources: I) -> Collection - where - I: IntoIterator> - { - self.inner - .concatenate(sources.into_iter().map(|x| x.inner)) - .as_collection() - } /// Replaces each record with another, with a new difference type. /// /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) @@ -482,25 +525,6 @@ impl Collection where G::Timestamp: Da .inspect_batch(func) .as_collection() } - /// Attaches a timely dataflow probe to the output of a Collection. - /// - /// This probe is used to determine when the state of the Collection has stabilized and can - /// be read out. - pub fn probe(&self) -> probe::Handle { - self.inner - .probe() - } - /// Attaches a timely dataflow probe to the output of a Collection. - /// - /// This probe is used to determine when the state of the Collection has stabilized and all updates observed. - /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a - /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to - /// avoid swamping the system. - pub fn probe_with(&self, handle: &mut probe::Handle) -> Collection { - self.inner - .probe_with(handle) - .as_collection() - } /// Assert if the collection is ever non-empty. /// @@ -534,11 +558,6 @@ impl Collection where G::Timestamp: Da self.consolidate() .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); } - - /// The scope containing the underlying timely dataflow stream. - pub fn scope(&self) -> G { - self.inner.scope() - } } use timely::dataflow::scopes::ScopeParent; @@ -673,13 +692,21 @@ impl Collection where G::Timestamp: Data } /// Conversion to a differential dataflow Collection. -pub trait AsCollection { +pub trait AsCollection::Timestamp, R)>> + where + G: Scope, + R: Semigroup, + C: TimelyContainer, +{ /// Converts the type to a differential dataflow collection. - fn as_collection(&self) -> Collection; + fn as_collection(&self) -> Collection; } -impl AsCollection for Stream { - fn as_collection(&self) -> Collection { +impl AsCollection for StreamCore + where + C: TimelyContainer +{ + fn as_collection(&self) -> Collection { Collection::new(self.clone()) } } @@ -710,12 +737,13 @@ impl AsCollection for Stream(scope: &mut G, iterator: I) -> Collection +pub fn concatenate(scope: &mut G, iterator: I) -> Collection where G: Scope, D: Data, R: Semigroup, - I: IntoIterator>, + I: IntoIterator>, + C: TimelyContainer, { scope .concatenate(iterator.into_iter().map(|x| x.inner)) diff --git a/src/consolidation.rs b/src/consolidation.rs index b4087bcc9..78f386ea0 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -10,8 +10,32 @@ //! you need specific behavior, it may be best to defensively copy, paste, and maintain the //! specific behavior you require. +use timely::container::columnation::{Columnation, TimelyStack}; +use Data; use crate::difference::Semigroup; +/// Consolidate the contents of `self`. +/// +/// Consolidation takes a container of `(data, diff)`-pairs and accumulates a single `diff` +/// per unique `data` element. Elements where the `diff` accumulates to zero are dropped. +/// Implementations are may reorder the contents of `self`. +pub trait Consolidation { + /// Consolidate `self`. + fn consolidate(&mut self); +} + +impl Consolidation for Vec<(T, R)> { + fn consolidate(&mut self) { + consolidate(self); + } +} + +impl Consolidation for Vec<(D, T, R)> { + fn consolidate(&mut self) { + consolidate_updates(self); + } +} + /// Sorts and consolidates `vec`. /// /// This method will sort `vec` and then consolidate runs of more than one entry with @@ -145,6 +169,45 @@ pub fn consolidate_updates_slice(slice: &mut [(D, offset } +impl Consolidation for TimelyStack<(A, B, C)> { + fn consolidate(&mut self) { + if self.is_empty() { + return; + } + + { + // unsafe reasoning: `sort_unstable_by` does not expose mutable access to elements + let slice = unsafe { self.local() }; + slice.sort_unstable_by(|x, y| (&x.0, &x.1).cmp(&(&y.0, &y.1))); + } + + // Replace `self` with a new allocation. + // TODO: recycle the old `self` + let input = &std::mem::replace(self, TimelyStack::with_capacity(self.len()))[..]; + + let mut diff: Option = None; + for i in 0..(input.len()) { + // accumulate diff + if let Some(diff) = diff.as_mut() { + // we already have a diff, simply plus_equal it + diff.plus_equals(&input[i].2) + } else { + // last element was undefined or different, initialize new diff + diff = Some(input[i].2.clone()) + } + + // is the current element == next element (except diff)? + if i == input.len() - 1 || (input[i].0 != input[i+1].0 || input[i].1 != input[i+1].1) { + // element[i] != element[i+1] + // emit element[i] if accumulated diff != 0 + if !diff.as_ref().map(Semigroup::is_zero).unwrap_or(true) { + self.copy_destructured(&input[i].0, &input[i].1, &diff.take().unwrap()); + } + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -211,4 +274,31 @@ mod tests { assert_eq!(input, output); } } + + #[test] + fn test_consolidate_updates_column_stack() { + let test_cases: Vec<(TimelyStack<_>, TimelyStack<_>)> = vec![ + ( + vec![("a".to_owned(), 1, -1), ("b".to_owned(), 1, -2), ("a".to_owned(), 1, 1)].iter().collect(), + vec![("b".to_owned(), 1, -2)].iter().collect(), + ), + ( + vec![("a".to_owned(), 1, -1), ("b".to_owned(), 1, 0), ("a".to_owned(), 1, 1)].iter().collect(), + vec![].iter().collect(), + ), + ( + vec![("a".to_owned(), 1, 0)].iter().collect(), + vec![].iter().collect(), + ), + ( + vec![("a".to_owned(), 1, 0), ("b".to_owned(), 1, 0)].iter().collect(), + vec![].iter().collect(), + ), + ]; + + for (mut input, output) in test_cases { + input.consolidate(); + assert_eq!(input, output); + } + } } diff --git a/src/lib.rs b/src/lib.rs index c6b10904a..7380f722b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,6 +102,8 @@ extern crate abomonation; extern crate serde_derive; extern crate serde; +pub use timely::Container as TimelyContainer; + pub mod hashable; pub mod operators; pub mod algorithms; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index a2dd3e1b0..0dd983bd0 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -19,19 +19,21 @@ use timely::dataflow::operators::{Enter, Map}; use timely::order::{PartialOrder, TotalOrder}; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, ScopeParent, Stream}; use timely::dataflow::operators::generic::Operator; -use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange}; +use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange, ParallelizationContractCore, ExchangeCore}; use timely::progress::Timestamp; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::Capability; +use timely::container::PushPartitioned; -use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; +use ::{Data, ExchangeData, Collection, AsCollection, Hashable, TimelyContainer}; use ::difference::Semigroup; use lattice::Lattice; use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; use trace::implementations::ord::OrdValSpine as DefaultValTrace; use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; +use trace::layers::MergeContainer; use trace::wrappers::enter::{TraceEnter, BatchEnter}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -439,11 +441,12 @@ where /// /// This trait is implemented for appropriately typed collections and all traces that might accommodate them, /// as well as by arranged data for their corresponding trace type. -pub trait Arrange +pub trait Arrange::Timestamp, R)>> where G::Timestamp: Lattice, K: Data, V: Data, + C: TimelyContainer, { /// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type. /// @@ -455,8 +458,10 @@ where K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, + C: timely::ExchangeData + PushPartitioned, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, { self.arrange_named("Arrange") } @@ -471,10 +476,12 @@ where K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData, + C: timely::ExchangeData + PushPartitioned, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + let exchange = ExchangeCore::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); self.arrange_core(exchange, name) } @@ -485,46 +492,28 @@ where /// is the correct way to determine that times in the shared trace are committed. fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContractCore, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, ; } -impl Arrange for Collection +impl Arrange for Collection where G: Scope, G::Timestamp: Lattice+Ord, K: Data, V: Data, R: Semigroup, + C: Data + TimelyContainer + MergeContainer, { - fn arrange(&self) -> Arranged> - where - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch - { - self.arrange_named("Arrange") - } - - fn arrange_named(&self, name: &str) -> Arranged> - where - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch - { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } - fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContractCore, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -692,6 +681,7 @@ where P: ParallelizationContract, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, + ::Batcher: Batcher>, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 32b770c22..9740fb848 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -8,8 +8,9 @@ use timely::dataflow::Scope; -use ::{Collection, ExchangeData, Hashable}; +use ::{Collection, ExchangeData, Hashable, TimelyContainer}; use ::difference::Semigroup; +use consolidation::Consolidation; use Data; use lattice::Lattice; @@ -57,13 +58,22 @@ where where Tr: crate::trace::Trace+crate::trace::TraceReader+'static, Tr::Batch: crate::trace::Batch, + ::Batcher: crate::trace::Batcher>, { use operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) .arrange_named::(name) .as_collection(|d: &D, _| d.clone()) } +} +impl Collection + where + C: Consolidation+TimelyContainer, + D: ExchangeData+Hashable, + R: ExchangeData+Semigroup, + G::Timestamp: ::lattice::Lattice+Ord, +{ /// Aggregates the weights of equal records. /// /// Unlike `consolidate`, this method does not exchange data and does not @@ -101,12 +111,12 @@ where self.inner .unary(Pipeline, "ConsolidateStream", |_cap, _info| { - let mut vector = Vec::new(); + let mut vector = C::default(); move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - crate::consolidation::consolidate_updates(&mut vector); - output.session(&time).give_vec(&mut vector); + vector.consolidate(); + output.session(&time).give_container(&mut vector); }) } }) diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 6815e8072..2a10aa11a 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -7,9 +7,16 @@ use timely::progress::frontier::Antichain; use ::difference::Semigroup; +use consolidation::Consolidation; +use Data; use lattice::Lattice; +use trace::layers::{ContainerMergeBatcher, MergeContainer}; use trace::{Batch, Batcher, Builder}; +impl MergeContainer for Vec<(D, T, R)> { + type MergeBatcher = MergeSorter; +} + /// Creates batches from unordered tuples. pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup { sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>, @@ -21,11 +28,12 @@ pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, impl Batcher for MergeBatcher where B: Batch, - B::Key: Ord+Clone, - B::Val: Ord+Clone, + B::Key: Data, + B::Val: Data, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, B::R: Semigroup, { + type Input = Vec<((B::Key, B::Val), B::Time, B::R)>; fn new() -> Self { MergeBatcher { sorter: MergeSorter::new(), @@ -178,7 +186,7 @@ impl MergeSorter { }; if batch.len() > 0 { - crate::consolidation::consolidate_updates(&mut batch); + batch.consolidate(); self.queue.push(vec![batch]); while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { let list1 = self.queue.pop().unwrap(); @@ -293,7 +301,27 @@ impl MergeSorter { } } -/// Reports the number of elements satisfing the predicate. +impl ContainerMergeBatcher> for MergeSorter { + fn empty(&mut self) -> Vec<(D, T, R)> { + MergeSorter::empty(self) + } + + fn push(&mut self, batch: &mut Vec<(D, T, R)>) { + MergeSorter::push(self, batch) + } + + fn finish_into(&mut self, target: &mut Vec>) { + MergeSorter::finish_into(self, target) + } +} + +impl Default for MergeSorter { + fn default() -> Self { + Self::new() + } +} + +/// Reports the number of elements satisfying the predicate. /// /// This methods *relies strongly* on the assumption that the predicate /// stays false once it becomes false, a joint property of the predicate diff --git a/src/trace/implementations/merge_batcher_columnation.rs b/src/trace/implementations/merge_batcher_columnation.rs new file mode 100644 index 000000000..abb8c1890 --- /dev/null +++ b/src/trace/implementations/merge_batcher_columnation.rs @@ -0,0 +1,357 @@ +//! A general purpose `Batcher` implementation based on radix sort for TimelyStack. + +use timely::Container; +use timely::communication::message::RefOrMut; +use timely::container::columnation::{Columnation, TimelyStack}; +use timely::progress::frontier::Antichain; + +use ::difference::Semigroup; + +use lattice::Lattice; +use trace::{Batch, Batcher, Builder}; + +impl MergeContainer for TimelyStack<(D, T, R)> { + type MergeBatcher = MergeSorterColumnation; +} + +/// Creates batches from unordered tuples. +pub struct TimelyStackMergeBatcher + where + B::Key: Data+Columnation, + B::Val: Data+Columnation, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation, + B::R: Semigroup+Columnation, +{ + sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, + lower: Antichain, + frontier: Antichain, + phantom: ::std::marker::PhantomData, +} + +impl Batcher for TimelyStackMergeBatcher +where + B::Key: Data+Columnation, + B::Val: Data+Columnation, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation, + B::R: Semigroup+Columnation, +{ + type Input = TimelyStack<((B::Key, B::Val), B::Time, B::R)>; + fn new() -> Self { + TimelyStackMergeBatcher { + sorter: MergeSorterColumnation::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(::minimum()), + phantom: ::std::marker::PhantomData, + } + } + + #[inline(never)] + fn push_batch(&mut self, batch: RefOrMut) { + // `batch` is either a shared reference or an owned allocations. + match batch { + RefOrMut::Ref(reference) => { + // This is a moment at which we could capture the allocations backing + // `batch` into a different form of region, rather than just cloning. + let mut owned: Self::Input = self.sorter.empty(); + owned.clone_from(reference); + self.sorter.push(&mut owned); + }, + RefOrMut::Mut(reference) => { + self.sorter.push(reference); + } + } + } + + // Sealing a batch means finding those updates with times not greater or equal to any time + // in `upper`. All updates must have time greater or equal to the previously used `upper`, + // which we call `lower`, by assumption that after sealing a batcher we receive no more + // updates with times not greater or equal to `upper`. + #[inline(never)] + fn seal(&mut self, upper: Antichain) -> B { + + let mut builder = B::Builder::new(); + + let mut merged = Default::default(); + self.sorter.finish_into(&mut merged); + + let mut kept = Vec::new(); + let mut keep = TimelyStack::default(); + + self.frontier.clear(); + + // TODO: Re-use buffer, rather than dropping. + for mut buffer in merged.drain(..) { + for datum @ ((key, val), time, diff) in &buffer[..] { + if upper.less_equal(time) { + self.frontier.insert(time.clone()); + if keep.len() == keep.capacity() { + if keep.len() > 0 { + kept.push(keep); + keep = self.sorter.empty(); + } + } + keep.copy(datum); + } + else { + builder.push((key.clone(), val.clone(), time.clone(), diff.clone())); + } + } + buffer.clear(); + // Recycling buffer. + self.sorter.push(&mut buffer); + } + + // Finish the kept data. + if keep.len() > 0 { + kept.push(keep); + } + if kept.len() > 0 { + self.sorter.push_list(kept); + } + + // Drain buffers (fast reclaimation). + // TODO : This isn't obviously the best policy, but "safe" wrt footprint. + // In particular, if we are reading serialized input data, we may + // prefer to keep these buffers around to re-fill, if possible. + let mut buffer = Default::default(); + self.sorter.push(&mut buffer); + // We recycle buffers with allocations (capacity, and not zero-sized). + while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { + buffer = Default::default(); + self.sorter.push(&mut buffer); + } + + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + self.lower = upper; + seal + } + + // the frontier of elements remaining after the most recent call to `self.seal`. + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + self.frontier.borrow() + } +} + + +use consolidation::Consolidation; +use Data; +use trace::layers::{ContainerMergeBatcher, MergeContainer}; + +pub struct TimelyStackQueue { + list: TimelyStack, + head: usize, +} + +impl TimelyStackQueue { + #[inline] + pub fn new() -> Self { TimelyStackQueue::from(Default::default()) } + #[inline] + pub fn pop(&mut self) -> &T { + self.head += 1; + &self.list[self.head - 1] + } + #[inline] + pub fn peek(&self) -> &T { + &self.list[self.head] + } + #[inline] + pub fn from(list: TimelyStack) -> Self { + TimelyStackQueue { + list, + head: 0, + } + } + #[inline] + pub fn done(mut self) -> TimelyStack { + self.list.clear(); + self.list + } + #[inline] + pub fn len(&self) -> usize { self.list.len() - self.head } + #[inline] + pub fn is_empty(&self) -> bool { self.head == self.list.len() } +} + +pub struct MergeSorterColumnation { + queue: Vec>>, // each power-of-two length list of allocations. + stash: Vec>, +} + +impl MergeSorterColumnation { + + const BUFFER_SIZE_BYTES: usize = 1 << 13; + + fn buffer_size() -> usize { + let size = ::std::mem::size_of::<(D, T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + #[inline] + pub fn new() -> Self { MergeSorterColumnation { queue: Vec::new(), stash: Vec::new() } } + + #[inline] + pub fn empty(&mut self) -> TimelyStack<(D, T, R)> { + self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) + } + + #[inline(never)] + pub fn _sort(&mut self, list: &mut Vec>) { + for mut batch in list.drain(..) { + self.push(&mut batch); + } + self.finish_into(list); + } + + #[inline] + pub fn push(&mut self, batch: &mut TimelyStack<(D, T, R)>) { + // TODO: Reason about possible unbounded stash growth. How to / should we return them? + // TODO: Reason about mis-sized vectors, from deserialized data; should probably drop. + let mut batch = if self.stash.len() > 2 { + ::std::mem::replace(batch, self.stash.pop().unwrap()) + } + else { + ::std::mem::replace(batch, Default::default()) + }; + + if batch.len() > 0 { + batch.consolidate(); + self.queue.push(vec![batch]); + while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + } + } + + // This is awkward, because it isn't a power-of-two length any more, and we don't want + // to break it down to be so. + pub fn push_list(&mut self, list: Vec>) { + while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + self.queue.push(list); + } + + #[inline(never)] + pub fn finish_into(&mut self, target: &mut Vec>) { + while self.queue.len() > 1 { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + + if let Some(mut last) = self.queue.pop() { + ::std::mem::swap(&mut last, target); + } + } + + // merges two sorted input lists into one sorted output list. + #[inline(never)] + fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { + + use std::cmp::Ordering; + + // TODO: `list1` and `list2` get dropped; would be better to reuse? + let mut output = Vec::with_capacity(list1.len() + list2.len()); + let mut result = self.empty(); + + let mut list1 = list1.into_iter().peekable(); + let mut list2 = list2.into_iter().peekable(); + + let mut head1 = if list1.peek().is_some() { TimelyStackQueue::from(list1.next().unwrap()) } else { TimelyStackQueue::new() }; + let mut head2 = if list2.peek().is_some() { TimelyStackQueue::from(list2.next().unwrap()) } else { TimelyStackQueue::new() }; + + // while we have valid data in each input, merge. + while !head1.is_empty() && !head2.is_empty() { + + while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 { + + let cmp = { + let x = head1.peek(); + let y = head2.peek(); + (&x.0, &x.1).cmp(&(&y.0, &y.1)) + }; + match cmp { + Ordering::Less => { result.copy(head1.pop()); } + Ordering::Greater => { result.copy(head2.pop()); } + Ordering::Equal => { + let (data1, time1, diff1) = head1.pop(); + let (_data2, _time2, diff2) = head2.pop(); + let mut diff1 = diff1.clone(); + diff1.plus_equals(&diff2); + if !diff1.is_zero() { + result.copy_destructured(data1, time1, &diff1); + } + } + } + } + + if result.capacity() == result.len() { + output.push(result); + result = self.empty(); + } + + if head1.is_empty() { + let done1 = head1.done(); + if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } + head1 = if list1.peek().is_some() { TimelyStackQueue::from(list1.next().unwrap()) } else { TimelyStackQueue::new() }; + } + if head2.is_empty() { + let done2 = head2.done(); + if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } + head2 = if list2.peek().is_some() { TimelyStackQueue::from(list2.next().unwrap()) } else { TimelyStackQueue::new() }; + } + } + + if result.len() > 0 { output.push(result); } + else if result.capacity() > 0 { self.stash.push(result); } + + if !head1.is_empty() { + let mut result = self.empty(); + for _ in 0 .. head1.len() { result.copy(head1.pop()); } + output.push(result); + } + output.extend(list1); + + if !head2.is_empty() { + let mut result = self.empty(); + for _ in 0 .. head2.len() { result.copy(head2.pop()); } + output.push(result); + } + output.extend(list2); + + output + } +} + +impl ContainerMergeBatcher> for MergeSorterColumnation { + fn empty(&mut self) -> TimelyStack<(D, T, R)> { + MergeSorterColumnation::empty(self) + } + + fn push(&mut self, batch: &mut TimelyStack<(D, T, R)>) { + MergeSorterColumnation::push(self, batch) + } + + fn finish_into(&mut self, target: &mut Vec>) { + MergeSorterColumnation::finish_into(self, target) + } +} + +impl Default for MergeSorterColumnation { + fn default() -> Self { + Self::new() + } +} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 4eee120de..baa4dc974 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -42,6 +42,8 @@ pub mod spine_fueled; mod merge_batcher; +mod merge_batcher_columnation; + pub use self::merge_batcher::MergeBatcher as Batcher; pub mod ord; diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 5c995e048..ac53eeb3d 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -14,9 +14,8 @@ use std::marker::PhantomData; use std::fmt::Debug; use std::ops::Deref; -use timely::container::columnation::TimelyStack; -use timely::container::columnation::Columnation; -use timely::progress::{Antichain, frontier::AntichainRef}; +use timely::container::columnation::{Columnation, TimelyStack}; +use timely::progress::{Antichain, frontier::AntichainRef, Timestamp}; use ::difference::Semigroup; use lattice::Lattice; @@ -31,28 +30,29 @@ use trace::description::Description; use trace::layers::MergeBuilder; -// use super::spine::Spine; use super::spine_fueled::Spine; use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; +use Data; +use trace::implementations::merge_batcher_columnation::TimelyStackMergeBatcher; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>; +pub type OrdValSpine = Spine, Vec, Vec>>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine, Vec>>>; +pub type OrdValSpineAbom = Spine, Vec, Vec>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>; +pub type OrdKeySpine = Spine, Vec>>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec>>>; +pub type OrdKeySpineAbom = Spine, Vec>, Vec>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine, TimelyStack>>>; +pub type ColValSpine = Spine, TimelyStack, TimelyStack>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine, TimelyStack>>>; /// A container that can retain/discard from some offset onward. @@ -87,12 +87,12 @@ impl RetainFrom for TimelyStack { /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Debug, Abomonation)] -pub struct OrdValBatch, CV=Vec> +pub struct OrdValBatch where K: Ord+Clone, V: Ord+Clone, - T: Clone+Lattice, - R: Clone, + T: Data+Lattice, + R: Data, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, @@ -101,13 +101,15 @@ where pub layer: OrderedLayer, O, CV>, O, CK>, /// Description of the update times this layer represents. pub desc: Description, + /// Phantom data + pub phantom: std::marker::PhantomData, } -impl BatchReader for OrdValBatch +impl BatchReader for OrdValBatch where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, + K: Data, + V: Data, + T: Data+Lattice, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, @@ -118,17 +120,17 @@ where type Time = T; type R = R; - type Cursor = OrdValCursor; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } fn len(&self) -> usize { , O, CV>, O, CK> as Trie>::tuples(&self.layer) } fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdValBatch +impl Batch for OrdValBatch, CK, CV> where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, + K: Data, + V: Data, + T: Lattice+Timestamp+Ord+Clone+Debug+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, @@ -143,11 +145,30 @@ where } } -impl OrdValBatch +impl Batch for OrdValBatch, CK, CV> where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+Ord+Clone+::std::fmt::Debug+'static, + K: Data+Columnation, + V: Data+Columnation, + T: Lattice+Timestamp+Columnation, + R: Semigroup+Columnation, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, + CV: BatchContainer+Deref+RetainFrom, +{ + type Batcher = TimelyStackMergeBatcher; + type Builder = OrdValBuilder; + type Merger = OrdValMerger; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + OrdValMerger::new(self, other, compaction_frontier) + } +} + +impl OrdValBatch +where + K: Data, + V: Data, + T: Data+Lattice, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, @@ -264,17 +285,18 @@ where should_compact: bool, } -impl Merger> for OrdValMerger +impl Merger> for OrdValMerger where - K: Ord+Clone+'static, - V: Ord+Clone+'static, + K: Data, + V: Data, T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, + OrdValBatch: Batch, { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -291,11 +313,11 @@ where lower2: 0, upper2: batch2.layer.keys(), result: <, O, CV>, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), - description: description, + description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -303,9 +325,10 @@ where OrdValBatch { layer: self.result.done(), desc: self.description, + phantom: std::marker::PhantomData, } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.vals.len(); let mut effort = 0isize; @@ -344,7 +367,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -357,24 +380,24 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdValCursor, CV=Vec> +pub struct OrdValCursor, CK=Vec, CV=Vec> where V: Ord+Clone, - T: Lattice+Ord+Clone, + T: Data+Lattice, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, { - phantom: std::marker::PhantomData<(K, CK, CV)>, + phantom: std::marker::PhantomData<(K, C, CK, CV)>, cursor: OrderedCursor, O, CV>>, } -impl Cursor for OrdValCursor +impl Cursor for OrdValCursor where - K: Ord+Clone, - V: Ord+Clone, - T: Lattice+Ord+Clone, + K: Data, + V: Data, + T: Data+Lattice, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, @@ -385,7 +408,7 @@ where type Time = T; type R = R; - type Storage = OrdValBatch; + type Storage = OrdValBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &self.cursor.child.key(&storage.layer.vals) } @@ -410,10 +433,10 @@ where /// A builder for creating layers from unsorted update tuples. pub struct OrdValBuilder, CV=Vec> where - K: Ord+Clone, - V: Ord+Clone, - T: Ord+Clone+Lattice, - R: Clone+Semigroup, + K: Data, + V: Data, + T: Data+Lattice, + R: Data+Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, @@ -421,15 +444,16 @@ where builder: OrderedBuilder, O, CV>, O, CK>, } -impl Builder> for OrdValBuilder +impl Builder> for OrdValBuilder where - K: Ord+Clone+'static, - V: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+::std::fmt::Debug+'static, + K: Data, + V: Data, + T: Data+Lattice+timely::progress::Timestamp, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, CV: BatchContainer+Deref+RetainFrom, + OrdValBatch: Batch, { fn new() -> Self { @@ -449,10 +473,11 @@ where } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), - desc: Description::new(lower, upper, since) + desc: Description::new(lower, upper, since), + phantom: std::marker::PhantomData, } } } @@ -462,11 +487,11 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Debug, Abomonation)] -pub struct OrdKeyBatch> +pub struct OrdKeyBatch where - K: Ord+Clone, - T: Clone+Lattice, - R: Clone, + K: Data, + T: Data+Lattice, + R: Data, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, { @@ -474,22 +499,25 @@ where pub layer: OrderedLayer, O, CK>, /// Description of the update times this layer represents. pub desc: Description, + /// Phantom data + pub phantom: std::marker::PhantomData, } -impl BatchReader for OrdKeyBatch +impl BatchReader for OrdKeyBatch where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, + K: Data, + T: Data+Lattice, R: Clone+Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, + OrdKeyCursor: Cursor, { type Key = K; type Val = (); type Time = T; type R = R; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -501,9 +529,9 @@ where fn description(&self) -> &Description { &self.desc } } -impl Batch for OrdKeyBatch +impl Batch for OrdKeyBatch, CK> where - K: Ord+Clone+'static, + K: Data, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, @@ -511,17 +539,34 @@ where { type Batcher = MergeBatcher; type Builder = OrdKeyBuilder; - type Merger = OrdKeyMerger; + type Merger = OrdKeyMerger, CK>; fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } } -impl OrdKeyBatch +impl Batch for OrdKeyBatch, CK> where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, + K: Data+Columnation, + T: Lattice+timely::progress::Timestamp+Data+Columnation, + R: Semigroup+Columnation, + O: OrdOffset, >::Error: Debug, >::Error: Debug, + CK: BatchContainer+Deref+RetainFrom, +{ + type Batcher = TimelyStackMergeBatcher; + type Builder = OrdKeyBuilder; + type Merger = OrdKeyMerger, CK>; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + OrdKeyMerger::new(self, other, compaction_frontier) + } +} + +impl OrdKeyBatch +where + K: Data, + T: Data+Lattice, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, @@ -589,7 +634,7 @@ where } /// State for an in-progress merge. -pub struct OrdKeyMerger> +pub struct OrdKeyMerger where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, @@ -607,17 +652,19 @@ where result: , O, CK> as Trie>::MergeBuilder, description: Description, should_compact: bool, + phantom: std::marker::PhantomData, } -impl Merger> for OrdKeyMerger +impl Merger> for OrdKeyMerger where - K: Ord+Clone+'static, + K: Data, T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, + OrdKeyBatch: Batch, { - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -636,9 +683,10 @@ where result: <, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), + phantom: std::marker::PhantomData, } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -646,9 +694,10 @@ where OrdKeyBatch { layer: self.result.done(), desc: self.description, + phantom: std::marker::PhantomData, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -693,7 +742,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -707,16 +756,16 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor> { +pub struct OrdKeyCursor { valid: bool, cursor: OrderedCursor>, - phantom: PhantomData<(K, O, CK)>, + phantom: PhantomData<(K, O, C, CK)>, } -impl Cursor for OrdKeyCursor +impl Cursor for OrdKeyCursor where - K: Ord+Clone, - T: Lattice+Ord+Clone, + K: Data, + T: Data+Lattice, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, @@ -726,7 +775,7 @@ where type Time = T; type R = R; - type Storage = OrdKeyBatch; + type Storage = OrdKeyBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } @@ -749,10 +798,10 @@ where /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder> +pub struct OrdKeyBuilder where - K: Ord+Clone, - T: Ord+Clone+Lattice, + K: Data, + T: Data+Lattice, R: Clone+Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, @@ -760,13 +809,14 @@ where builder: OrderedBuilder, O, CK>, } -impl Builder> for OrdKeyBuilder +impl Builder> for OrdKeyBuilder where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, + K: Data, + T: Data+Lattice+timely::progress::Timestamp, R: Semigroup, O: OrdOffset, >::Error: Debug, >::Error: Debug, CK: BatchContainer+Deref+RetainFrom, + OrdKeyBatch: Batch, { fn new() -> Self { @@ -787,10 +837,11 @@ where } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { + fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), - desc: Description::new(lower, upper, since) + desc: Description::new(lower, upper, since), + phantom: std::marker::PhantomData, } } } diff --git a/src/trace/layers/mod.rs b/src/trace/layers/mod.rs index e0d26088a..311468f18 100644 --- a/src/trace/layers/mod.rs +++ b/src/trace/layers/mod.rs @@ -4,8 +4,7 @@ //! in the next layer. Similarly, ranges of elements in the layer itself may correspond //! to single elements in the layer above. -use timely::container::columnation::TimelyStack; -use timely::container::columnation::Columnation; +use timely::container::columnation::{Columnation, TimelyStack}; pub mod ordered; pub mod ordered_leaf; @@ -175,8 +174,23 @@ impl BatchContainer for TimelyStack { } } +/// A generic interface to merge containers +pub trait MergeContainer: Sized { + /// The implementation to merge containers of type `Self`. + type MergeBatcher: ContainerMergeBatcher + Default; +} + +/// A batcher for merging containers +pub trait ContainerMergeBatcher { + /// Obtain an empty batch + fn empty(&mut self) -> C; + /// Push a batch of data into this batcher + fn push(&mut self, batch: &mut C); + /// Drain the merged batches into `target`. + fn finish_into(&mut self, target: &mut Vec); +} -/// Reports the number of elements satisfing the predicate. +/// Reports the number of elements satisfying the predicate. /// /// This methods *relies strongly* on the assumption that the predicate /// stays false once it becomes false, a joint property of the predicate diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 6e411475e..63977e2b5 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -14,6 +14,7 @@ pub mod layers; pub mod wrappers; use timely::communication::message::RefOrMut; +use timely::Container; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; @@ -289,10 +290,12 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { /// Functionality for collecting and batching updates. pub trait Batcher { + /// Input to the batcher + type Input: Container; /// Allocates a new empty batcher. fn new() -> Self; /// Adds an unordered batch of elements to the batcher. - fn push_batch(&mut self, batch: RefOrMut>); + fn push_batch(&mut self, batch: RefOrMut); /// Returns all updates not greater or equal to an element of `upper`. fn seal(&mut self, upper: Antichain) -> Output; /// Returns the lower envelope of contained update times. @@ -417,8 +420,9 @@ pub mod rc_blanket_impls { /// Functionality for collecting and batching updates. impl Batcher> for RcBatcher { + type Input = <::Batcher as Batcher>::Input; fn new() -> Self { RcBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } + fn push_batch(&mut self, batch: RefOrMut) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Rc { Rc::new(self.batcher.seal(upper)) } fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.batcher.frontier() } } @@ -533,8 +537,9 @@ pub mod abomonated_blanket_impls { /// Functionality for collecting and batching updates. impl Batcher>> for AbomonatedBatcher { + type Input = <::Batcher as Batcher>::Input; fn new() -> Self { AbomonatedBatcher { batcher: >::new() } } - fn push_batch(&mut self, batch: RefOrMut>) { self.batcher.push_batch(batch) } + fn push_batch(&mut self, batch: RefOrMut) { self.batcher.push_batch(batch) } fn seal(&mut self, upper: Antichain) -> Abomonated> { let batch = self.batcher.seal(upper); let mut bytes = Vec::with_capacity(measure(&batch)); diff --git a/tests/trace.rs b/tests/trace.rs index d00c4497e..d0d028cd9 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -1,21 +1,16 @@ extern crate timely; extern crate differential_dataflow; -use std::rc::Rc; - use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; -use differential_dataflow::trace::implementations::ord::OrdValBatch; +use differential_dataflow::trace::implementations::ord::OrdValSpine; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; -use differential_dataflow::trace::implementations::spine_fueled::Spine; - -pub type OrdValSpine = Spine>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>> { +fn get_trace() -> IntegerTrace { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { From 892b0a89776b65e8d9c75a769a87116fe70d8729 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 27 Jan 2023 20:48:13 -0500 Subject: [PATCH 2/4] fixes and flat_map_ref and the likes Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 51 ++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 0dd983bd0..06f3ebf3c 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -33,7 +33,7 @@ use lattice::Lattice; use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; use trace::implementations::ord::OrdValSpine as DefaultValTrace; use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; -use trace::layers::MergeContainer; +use trace::layers::{BatchContainer, MergeContainer}; use trace::wrappers::enter::{TraceEnter, BatchEnter}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -78,6 +78,7 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; +use timely::Container; impl Arranged where @@ -198,12 +199,26 @@ where /// The underlying `Stream>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_collection(&self, mut logic: L) -> Collection + pub fn as_collection(&self, logic: L) -> Collection where Tr::R: Semigroup, L: FnMut(&Tr::Key, &Tr::Val) -> D+'static, { - self.flat_map_ref(move |key, val| Some(logic(key,val))) + self.as_collection_core(logic) + } + + /// Flattens the stream into a `Collection`. + /// + /// The underlying `Stream>` is a much more efficient way to access the data, + /// and this method should only be used when the data need to be transformed or exchanged, rather than + /// supplied as arguments to an operator using the same key-value structure. + pub fn as_collection_core(&self, mut logic: L) -> Collection + where + Tr::R: Semigroup, + L: FnMut(&Tr::Key, &Tr::Val) -> D+'static, + C: Container + BatchContainer, + { + self.flat_map_ref_core(move |key, val| Some(logic(key,val))) } /// Extracts elements from an arrangement as a collection. @@ -216,6 +231,21 @@ where I: IntoIterator, I::Item: Data, L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, + { + self.flat_map_ref_core(logic) + } + + /// Extracts elements from an arrangement as a collection. + /// + /// The supplied logic may produce an iterator over output values, allowing either + /// filtering or flat mapping as part of the extraction. + pub fn flat_map_ref_core(&self, logic: L) -> Collection + where + Tr::R: Semigroup, + I: IntoIterator, + I::Item: Data, + L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, + C: Container + BatchContainer, { Self::flat_map_batches(&self.stream, logic) } @@ -227,14 +257,16 @@ where /// /// This method exists for streams of batches without the corresponding arrangement. /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: &Stream, mut logic: L) -> Collection + pub fn flat_map_batches(stream: &Stream, mut logic: L) -> Collection where Tr::R: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, + C: Container + BatchContainer, { stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { + let mut buffer = C::with_capacity(::timely::container::buffer::default_capacity::<(I::Item, G::Timestamp, Tr::R)>()); input.for_each(|time, data| { let mut session = output.session(&time); for wrapper in data.iter() { @@ -244,7 +276,11 @@ where while let Some(val) = cursor.get_val(batch) { for datum in logic(key, val) { cursor.map_times(batch, |time, diff| { - session.give((datum.clone(), time.clone(), diff.clone())); + buffer.copy(&(datum.clone(), time.clone(), diff.clone())); + if buffer.len() == buffer.capacity() { + session.give_container(&mut buffer); + buffer = C::with_capacity(::timely::container::buffer::default_capacity::<(I::Item, G::Timestamp, Tr::R)>()); + } }); } cursor.step_val(batch); @@ -252,6 +288,9 @@ where cursor.step_key(batch); } } + if !buffer.is_empty() { + session.give_container(&mut buffer); + } }); }) .as_collection() @@ -506,7 +545,7 @@ where K: Data, V: Data, R: Semigroup, - C: Data + TimelyContainer + MergeContainer, + C: TimelyContainer + MergeContainer, { fn arrange_core(&self, pact: P, name: &str) -> Arranged> where From 619578c127a77d92377cdf17d3673d57e8ec77e1 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 15 Sep 2023 16:39:00 -0400 Subject: [PATCH 3/4] Preserve diff allocation on compaction Signed-off-by: Moritz Hoffmann --- src/consolidation.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index 78f386ea0..e10f45550 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -183,14 +183,20 @@ impl C // Replace `self` with a new allocation. // TODO: recycle the old `self` - let input = &std::mem::replace(self, TimelyStack::with_capacity(self.len()))[..]; + let input = std::mem::replace(self, TimelyStack::with_capacity(self.len())); let mut diff: Option = None; + let mut must_clear = false; for i in 0..(input.len()) { // accumulate diff if let Some(diff) = diff.as_mut() { - // we already have a diff, simply plus_equal it - diff.plus_equals(&input[i].2) + if must_clear { + diff.clone_from(&input[i].2); + must_clear = false; + } else { + // we already have a diff, simply plus_equal it + diff.plus_equals(&input[i].2) + } } else { // last element was undefined or different, initialize new diff diff = Some(input[i].2.clone()) @@ -201,7 +207,8 @@ impl C // element[i] != element[i+1] // emit element[i] if accumulated diff != 0 if !diff.as_ref().map(Semigroup::is_zero).unwrap_or(true) { - self.copy_destructured(&input[i].0, &input[i].1, &diff.take().unwrap()); + self.copy_destructured(&input[i].0, &input[i].1, diff.as_ref().unwrap()); + must_clear = true; } } } From e7777a36af590c340ce6cb177d1fa80e2657b217 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 1 Nov 2023 15:23:00 -0400 Subject: [PATCH 4/4] Collection does not depend directly on TimelyContainer Signed-off-by: Moritz Hoffmann --- src/collection.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/collection.rs b/src/collection.rs index 27631c0b9..e55532db5 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -9,6 +9,7 @@ //! implementations, and to support efficient incremental updates to the collections. use std::hash::Hash; +use std::marker::PhantomData; use timely::Data; use timely::progress::Timestamp; @@ -45,13 +46,14 @@ pub struct Collection::Timestamp where G: Scope, R: Semigroup, - C: TimelyContainer, { /// The underlying timely dataflow stream. /// /// This field is exposed to support direct timely dataflow manipulation when required, but it is /// not intended to be the idiomatic way to work with the collection. - pub inner: StreamCore + pub inner: StreamCore, + /// Phantom data to consume type parameters. + pub _phantom: PhantomData<(*const D, *const R)>, } impl Collection @@ -65,7 +67,7 @@ impl Collection /// provides a `new_collection` method which will create a new collection for you without exposing /// the underlying timely stream at all. pub fn new(stream: StreamCore) -> Collection { - Self { inner: stream } + Self { inner: stream, _phantom: PhantomData } } /// Creates a new collection accumulating the contents of the two collections.