diff --git a/examples/spines.rs b/examples/spines.rs index d4d2e64bc..4ae540e5d 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -55,11 +55,11 @@ fn main() { let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) .arrange::>() - .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) .arrange::>() - .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |v| v.clone(), |_,_,output| output.push(((), 1))); + .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1))); keys.join_core(&data, |k,_v1,_v2| { println!("{:?}", k.text); diff --git a/server/Cargo.toml b/server/Cargo.toml index 364b2ae51..ed79bb187 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ path="../" [dependencies] rand="0.3.13" libloading="*" -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { workspace = true } #[workspace] #members = [ diff --git a/src/capture.rs b/src/capture.rs index 0a8cd03aa..33f706b85 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -399,7 +399,7 @@ pub mod source { for message in source.by_ref() { match message { Message::Updates(mut updates) => { - updates_session.give_vec(&mut updates); + updates_session.give_container(&mut updates); } Message::Progress(progress) => { // We must send a copy of each progress message to all workers, diff --git a/src/consolidation.rs b/src/consolidation.rs index b4087bcc9..637df0eff 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -10,6 +10,9 @@ //! you need specific behavior, it may be best to defensively copy, paste, and maintain the //! specific behavior you require. +use std::collections::VecDeque; +use timely::container::{ContainerBuilder, PushContainer, PushInto}; +use crate::Data; use crate::difference::Semigroup; /// Sorts and consolidates `vec`. @@ -145,6 +148,84 @@ pub fn consolidate_updates_slice(slice: &mut [(D, offset } +/// A container builder that consolidates data in-places into fixed-sized containers. Does not +/// maintain FIFO ordering. +#[derive(Default)] +pub struct ConsolidatingContainerBuilder{ + current: C, + empty: Vec, + outbound: VecDeque, +} + +impl ConsolidatingContainerBuilder> +where + D: Data, + T: Data, + R: Semigroup, +{ + /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements. + // TODO: Can we replace `multiple` by a bool? + fn consolidate_and_flush_through(&mut self, multiple: usize) { + let preferred_capacity = >::preferred_capacity(); + consolidate_updates(&mut self.current); + let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable(); + while drain.peek().is_some() { + let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity)); + container.extend((&mut drain).take(preferred_capacity)); + self.outbound.push_back(container); + } + } +} + +impl ContainerBuilder for ConsolidatingContainerBuilder> +where + D: Data, + T: Data, + R: Semigroup, +{ + type Container = Vec<(D,T,R)>; + + /// Push an element. + /// + /// Precondition: `current` is not allocated or has space for at least one element. + #[inline] + fn push>(&mut self, item: P) { + let preferred_capacity = >::preferred_capacity(); + if self.current.capacity() < preferred_capacity * 2 { + self.current.reserve(preferred_capacity * 2 - self.current.capacity()); + } + item.push_into(&mut self.current); + if self.current.len() == self.current.capacity() { + // Flush complete containers. + self.consolidate_and_flush_through(preferred_capacity); + } + } + + fn push_container(&mut self, container: &mut Self::Container) { + for item in container.drain(..) { + self.push(item); + } + } + + fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> { + if let Some(container) = self.outbound.pop_front() { + self.empty.push(container); + self.empty.last_mut() + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> { + // Flush all + self.consolidate_and_flush_through(1); + // Remove all but two elements from the stash of empty to avoid memory leaks. We retain + // two to match `current` capacity. + self.empty.truncate(2); + self.extract() + } +} + #[cfg(test)] mod tests { use super::*; @@ -211,4 +292,46 @@ mod tests { assert_eq!(input, output); } } + + #[test] + fn test_consolidating_container_builder() { + let mut ccb = >>::default(); + for _ in 0..1024 { + ccb.push((0, 0, 0)); + } + assert_eq!(ccb.extract(), None); + assert_eq!(ccb.finish(), None); + + for i in 0..1024 { + ccb.push((i, 0, 1)); + } + + let mut collected = Vec::default(); + while let Some(container) = ccb.finish() { + collected.append(container); + } + // The output happens to be sorted, but it's not guaranteed. + collected.sort(); + for i in 0..1024 { + assert_eq!((i, 0, 1), collected[i]); + } + + ccb = Default::default(); + ccb.push_container(&mut Vec::default()); + assert_eq!(ccb.extract(), None); + assert_eq!(ccb.finish(), None); + + ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1)))); + ccb.push_container(&mut Vec::from_iter((0..1024).map(|i| (i, 0, 1)))); + collected.clear(); + while let Some(container) = ccb.finish() { + collected.append(container); + } + // The output happens to be sorted, but it's not guaranteed. + consolidate_updates(&mut collected); + for i in 0..1024 { + assert_eq!((i, 0, 2), collected[i]); + } + + } } diff --git a/src/dynamic/mod.rs b/src/dynamic/mod.rs index 4d1eeff43..18208d3ae 100644 --- a/src/dynamic/mod.rs +++ b/src/dynamic/mod.rs @@ -60,7 +60,7 @@ where vec.truncate(level - 1); time.inner = PointStamp::new(vec); } - output.session(&new_cap).give_vec(&mut vector); + output.session(&new_cap).give_container(&mut vector); }); }); diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index c22749a27..a3c29c6e3 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -255,7 +255,7 @@ where self.join_core_internal_unsafe(other, result) } /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method. - pub fn join_core_internal_unsafe (&self, other: &Arranged, result: L) -> Collection + pub fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection where T2: for<'a> TraceReader=T1::Key<'a>, Time=T1::Time>+Clone+'static, D: Data, @@ -264,7 +264,15 @@ where L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, { use crate::operators::join::join_traces; - join_traces(self, other, result) + join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>( + self, + other, + move |k, v1, v2, t, d1, d2, c| { + for datum in result(k, v1, v2, t, d1, d2) { + c.give(datum); + } + } + ) .as_collection() } } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 46b96528c..ac0ef7f8f 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -99,7 +99,7 @@ where input.for_each(|time, data| { data.swap(&mut vector); crate::consolidation::consolidate_updates(&mut vector); - output.session(&time).give_vec(&mut vector); + output.session(&time).give_container(&mut vector); }) } }) diff --git a/src/operators/join.rs b/src/operators/join.rs index 311f06802..03bdc1e0d 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -4,13 +4,16 @@ //! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c) //! + (b * c), and if this is not equal to the former term, little is known about the actual output. use std::cmp::Ordering; +use timely::Container; -use timely::container::{PushContainer, PushInto}; +use timely::container::{ContainerBuilder, PushContainer, PushInto}; use timely::order::PartialOrder; use timely::progress::Timestamp; use timely::dataflow::{Scope, StreamCore}; use timely::dataflow::operators::generic::{Operator, OutputHandleCore}; use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::channels::pushers::buffer::Session; +use timely::dataflow::channels::pushers::Counter; use timely::dataflow::operators::Capability; use timely::dataflow::channels::pushers::tee::Tee; @@ -316,6 +319,41 @@ where } } +/// The session passed to join closures. +pub type JoinSession<'a, T, CB, C> = Session<'a, T, EffortBuilder, Counter>>; + +/// A container builder that tracks the length of outputs to estimate the effort of join closures. +#[derive(Default, Debug)] +pub struct EffortBuilder(pub std::cell::Cell, pub CB); + +impl ContainerBuilder for EffortBuilder { + type Container = CB::Container; + + #[inline] + fn push>(&mut self, item: T) where Self::Container: PushContainer { + self.1.push(item) + } + + #[inline] + fn push_container(&mut self, container: &mut Self::Container) { + self.1.push_container(container) + } + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + let extracted = self.1.extract(); + self.0.replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.len())); + extracted + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + let finished = self.1.finish(); + self.0.replace(self.0.take() + finished.as_ref().map_or(0, |e| e.len())); + finished + } +} + /// An equijoin of two traces, sharing a common key type. /// /// This method exists to provide join functionality without opinions on the specific input types, keys and values, @@ -323,24 +361,19 @@ where /// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic /// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. /// -/// The implementation produces a caller-specified container, with the requirement that the container -/// can absorb `(D, G::Timestamp, R)` triples. Implementations can use [`AsCollection`] to wrap the +/// The implementation produces a caller-specified container. Implementations can use [`AsCollection`] to wrap the /// output stream in a collection. /// /// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. /// /// [`AsCollection`]: crate::collection::AsCollection -pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore +pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore where G: Scope, T1: TraceReader+Clone+'static, T2: for<'a> TraceReader=T1::Key<'a>, Time=T1::Time>+Clone+'static, - D: Data, - R: Semigroup, - I: IntoIterator, - L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, - C: PushContainer, - (D, G::Timestamp, R): PushInto, + L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff,&mut JoinSession)+'static, + CB: ContainerBuilder + 'static, { // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); @@ -512,7 +545,7 @@ where while !todo1.is_empty() && fuel > 0 { todo1.front_mut().unwrap().work( output, - |k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2), + |k,v2,v1,t,r2,r1,c| result(k,v1,v2,t,r1,r2,c), &mut fuel ); if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } @@ -523,7 +556,7 @@ where while !todo2.is_empty() && fuel > 0 { todo2.front_mut().unwrap().work( output, - |k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2), + |k,v1,v2,t,r1,r2,c| result(k,v1,v2,t,r1,r2,c), &mut fuel ); if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } @@ -582,13 +615,11 @@ where /// The structure wraps cursors which allow us to play out join computation at whatever rate we like. /// This allows us to avoid producing and buffering massive amounts of data, without giving the timely /// dataflow system a chance to run operators that can consume and aggregate the data. -struct Deferred +struct Deferred where T: Timestamp+Lattice+Ord, - R: Semigroup, C1: Cursor, C2: for<'a> Cursor=C1::Key<'a>, Time=T>, - D: Ord+Clone+Data, { trace: C1, trace_storage: C1::Storage, @@ -596,16 +627,13 @@ where batch_storage: C2::Storage, capability: Capability, done: bool, - temp: Vec<((D, T), R)>, } -impl Deferred +impl Deferred where C1: Cursor, C2: for<'a> Cursor=C1::Key<'a>, Time=T>, T: Timestamp+Lattice+Ord, - R: Semigroup, - D: Clone+Data, { fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { Deferred { @@ -615,7 +643,6 @@ where batch_storage, capability, done: false, - temp: Vec::new(), } } @@ -625,18 +652,15 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandleCore>, mut logic: L, fuel: &mut usize) + fn work(&mut self, output: &mut OutputHandleCore, Tee>, mut logic: L, fuel: &mut usize) where - I: IntoIterator, - L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, - C: PushContainer, - (D, T, R): PushInto, + L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession), { let meet = self.capability.time(); let mut effort = 0; - let mut session = output.session(&self.capability); + let mut session = output.session_with_builder(&self.capability); let trace_storage = &self.trace_storage; let batch_storage = &self.batch_storage; @@ -644,7 +668,6 @@ where let trace = &mut self.trace; let batch = &mut self.batch; - let temp = &mut self.temp; let mut thinker = JoinThinker::new(); while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel { @@ -657,28 +680,15 @@ where thinker.history1.edits.load(trace, trace_storage, |time| time.join(meet)); thinker.history2.edits.load(batch, batch_storage, |time| time.clone()); - assert_eq!(temp.len(), 0); - // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { let key = batch.key(batch_storage); - for (d, t, r) in logic(key, v1, v2, &t, r1, r2) { - temp.push(((d, t), r)); - } + logic(key, v1, v2, &t, r1, r2, &mut session); }); - // TODO: This consolidation is optional, and it may not be very - // helpful. We might try harder to understand whether we - // should do this work here, or downstream at consumers. - // TODO: Perhaps `thinker` should have the buffer, do smarter - // consolidation, and then deposit results in `session`. - crate::consolidation::consolidate(temp); - - effort += temp.len(); - for ((d, t), r) in temp.drain(..) { - session.give((d, t, r)); - } - + // TODO: Effort isn't perfectly tracked as we might still have some data in the + // session at the moment it's dropped. + effort += session.builder().0.take(); batch.step_key(batch_storage); trace.step_key(trace_storage); @@ -687,7 +697,6 @@ where } } } - self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage); if effort > *fuel { *fuel = 0; }