From 12b2a5585ffd91a70e3ca647761a988cb7e1c1e1 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi <1756620+uce@users.noreply.github.com> Date: Thu, 27 May 2021 12:25:24 +0200 Subject: [PATCH 1/6] half_join: maybe clarify comment --- dogsdogsdogs/src/operators/half_join.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 4335e5a4f..8330b3d5b 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -53,20 +53,20 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// ((key, val1, time1), initial_time, diff1) /// ``` /// -/// where `initial_time` is less or equal to `time`, and produces as output +/// where `initial_time` is less or equal to `time1`, and produces as output /// /// ```ignore -/// ((key, (val1, val2), lub(time1, time2)), initial_time, diff1 * diff2) +/// ((output_func(key, val1, val2), lub(time1, time2)), initial_time, diff1 * diff2) /// ``` /// -/// for each `((key, val2), time2, diff2)` present in `arrangement, where +/// for each `((key, val2), time2, diff2)` present in `arrangement`, where /// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. /// This last constraint is important to ensure that we correctly produce /// all pairs of output updates across multiple `half_join` operators. /// /// Notice that the time is hoisted up into data. The expectation is that -/// once out of the dataflow, the updates will be `delay`d to the times -/// specified in the payloads. +/// once out of the "delta flow region", the updates will be `delay`d to the +/// times specified in the payloads. pub fn half_join( stream: &Collection, mut arrangement: Arranged, From d5f0a475c7104508dff74a4f2a05b0dd26617d5a Mon Sep 17 00:00:00 2001 From: Ufuk Celebi <1756620+uce@users.noreply.github.com> Date: Thu, 27 May 2021 14:57:24 +0200 Subject: [PATCH 2/6] half_join: add unsafe variant --- dogsdogsdogs/src/operators/half_join.rs | 58 +++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 8330b3d5b..8b502082a 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -69,7 +69,7 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// times specified in the payloads. pub fn half_join( stream: &Collection, - mut arrangement: Arranged, + arrangement: Arranged, frontier_func: FF, comparison: CF, mut output_func: S, @@ -89,6 +89,56 @@ where DOut: Clone+'static, Tr::R: std::ops::Mul, S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static, +{ + let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff: &Tr::R| { + let dout = (output_func(k, v1, v2), time.clone()); + Some((dout, initial.clone(), diff.clone())).into_iter() + }; + half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, output_func) +} + +/// An unsafe variant of `half_join` where the `output_func` closure takes +/// additional arguments for `time` and `diff` as input and returns an iterator +/// over `(data, time, diff)` triplets. This allows for more flexibility, but +/// is more error-prone. +/// +/// This operator responds to inputs of the form +/// +/// ```ignore +/// ((key, val1, time1), initial_time, diff1) +/// ``` +/// +/// where `initial_time` is less or equal to `time1`, and produces as output +/// +/// ```ignore +/// output_func(key, val1, val2, initial_time, lub(time1, time2), diff1 * diff2) +/// ``` +/// +/// for each `((key, val2), time2, diff2)` present in `arrangement`, where +/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. +pub fn half_join_internal_unsafe( + stream: &Collection, + mut arrangement: Arranged, + frontier_func: FF, + comparison: CF, + mut output_func: S, +) -> Collection +where + G: Scope, + G::Timestamp: Lattice, + V: ExchangeData, + Tr: TraceReader+Clone+'static, + Tr::Key: Ord+Hashable+ExchangeData, + Tr::Val: Clone, + Tr::Batch: BatchReader, + Tr::Cursor: Cursor, + Tr::R: Monoid+ExchangeData, + FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, + CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, + DOut: Clone+'static, + Tr::R: std::ops::Mul, + I: IntoIterator, + S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R)-> I + 'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -145,8 +195,10 @@ where }); consolidate(&mut output_buffer); for (time, count) in output_buffer.drain(..) { - let dout = output_func(key, val1, val2); - session.give(((dout, time), initial.clone(), count * diff.clone())); + let diff = count * diff.clone(); + for dout in output_func(key, val1, val2, initial, &time, &diff) { + session.give(dout); + } } cursor.step_val(&storage); } From 16847ac180a6595ec2a0af6811ff079cc448bd27 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi <1756620+uce@users.noreply.github.com> Date: Tue, 8 Jun 2021 11:28:08 +0200 Subject: [PATCH 3/6] fixup! half_join: add unsafe variant --- dogsdogsdogs/src/operators/half_join.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 8b502082a..01c1f1588 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -90,9 +90,10 @@ where Tr::R: std::ops::Mul, S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static, { - let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff: &Tr::R| { + let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::R, diff2: &Tr::R| { + let diff = diff1.clone() * diff2.clone(); let dout = (output_func(k, v1, v2), time.clone()); - Some((dout, initial.clone(), diff.clone())).into_iter() + Some((dout, initial.clone(), diff)).into_iter() }; half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, output_func) } @@ -111,7 +112,7 @@ where /// where `initial_time` is less or equal to `time1`, and produces as output /// /// ```ignore -/// output_func(key, val1, val2, initial_time, lub(time1, time2), diff1 * diff2) +/// output_func(key, val1, val2, initial_time, lub(time1, time2), diff1, diff2) /// ``` /// /// for each `((key, val2), time2, diff2)` present in `arrangement`, where @@ -136,9 +137,8 @@ where FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, - Tr::R: std::ops::Mul, I: IntoIterator, - S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R)-> I + 'static, + S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R, &Tr::R)-> I + 'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -182,7 +182,7 @@ where let (mut cursor, storage) = trace.cursor(); - for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff) in proposals.iter_mut() { + for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { // Use TOTAL ORDER to allow the release of `time`. if !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { cursor.seek_key(&storage, &key); @@ -194,9 +194,8 @@ where } }); consolidate(&mut output_buffer); - for (time, count) in output_buffer.drain(..) { - let diff = count * diff.clone(); - for dout in output_func(key, val1, val2, initial, &time, &diff) { + for (time, diff2) in output_buffer.drain(..) { + for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) { session.give(dout); } } @@ -204,7 +203,7 @@ where } cursor.rewind_vals(&storage); } - *diff = Tr::R::zero(); + *diff1 = Tr::R::zero(); } } From 0a26469cd9bdac6a3483a907e2600965dcae438e Mon Sep 17 00:00:00 2001 From: Ufuk Celebi <1756620+uce@users.noreply.github.com> Date: Tue, 8 Jun 2021 13:45:25 +0200 Subject: [PATCH 4/6] fixup! half_join: add unsafe variant --- dogsdogsdogs/src/operators/half_join.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 01c1f1588..dc1a87fa4 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -117,13 +117,13 @@ where /// /// for each `((key, val2), time2, diff2)` present in `arrangement`, where /// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. -pub fn half_join_internal_unsafe( +pub fn half_join_internal_unsafe( stream: &Collection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, mut output_func: S, -) -> Collection +) -> Collection where G: Scope, G::Timestamp: Lattice, @@ -137,7 +137,8 @@ where FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, - I: IntoIterator, + ROut: Monoid, + I: IntoIterator, S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R, &Tr::R)-> I + 'static, { // No need to block physical merging for this operator. From a61fe3e602f35bdf9bab5d8ae2bd3702ee06fe4b Mon Sep 17 00:00:00 2001 From: Ufuk Celebi <1756620+uce@users.noreply.github.com> Date: Tue, 8 Jun 2021 21:41:29 +0200 Subject: [PATCH 5/6] join: add unsafe variant --- src/operators/join.rs | 119 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 14 deletions(-) diff --git a/src/operators/join.rs b/src/operators/join.rs index a325fc902..ea72b32f6 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -118,6 +118,7 @@ pub trait Join { /// ``` fn semijoin(&self, other: &Collection) -> Collection>::Output> where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, >::Output: Semigroup; + /// Subtracts the semijoin with `other` from `self`. /// /// In the case that `other` has multiplicities zero or one this results @@ -217,10 +218,12 @@ where /// directly in the event that one has a handle to an `Arranged`, perhaps because /// the arrangement is available for re-use, or from the output of a `group` operator. pub trait JoinCore where G::Timestamp: Lattice+Ord { + /// Joins two arranged collections with the same key type. /// /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, - /// which produces something implementing `IntoIterator`, where the output collection will have + /// which produces something implementing `IntoIterator`, where the output collection will have an entry for + /// every value returned by the iterator. /// /// This trait is implemented for arrangements (`Arranged`) rather than collections. The `Join` trait /// contains the implementations for collections. @@ -265,6 +268,58 @@ pub trait JoinCore where G::Time I::Item: Data, L: FnMut(&K,&V,&Tr2::Val)->I+'static, ; + + /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and + /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more + /// flexibility, but is more error-prone. + /// + /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, + /// which produces something implementing `IntoIterator`, where the output collection will have an entry + /// for every value returned by the iterator. + /// + /// This trait is implemented for arrangements (`Arranged`) rather than collections. The `Join` trait + /// contains the implementations for collections. + /// + /// # Examples + /// + /// ``` + /// extern crate timely; + /// extern crate differential_dataflow; + /// + /// use differential_dataflow::input::Input; + /// use differential_dataflow::operators::arrange::ArrangeByKey; + /// use differential_dataflow::operators::join::JoinCore; + /// use differential_dataflow::trace::Trace; + /// use differential_dataflow::trace::implementations::ord::OrdValSpine; + /// + /// fn main() { + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1 + /// .arrange_by_key(); + /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1 + /// .arrange_by_key(); + /// + /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1; + /// + /// // Returned values have weight `a` + /// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a))) + /// .assert_eq(&z); + /// }); + /// } + /// ``` + fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection + where + Tr2: TraceReader+Clone+'static, + Tr2::Batch: BatchReader+'static, + Tr2::Cursor: Cursor+'static, + Tr2::Val: Ord+Clone+Debug+'static, + Tr2::R: Semigroup, + D: Data, + ROut: Semigroup, + I: IntoIterator, + L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static, + ; } @@ -292,6 +347,23 @@ where self.arrange_by_key() .join_core(stream2, result) } + + fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection + where + Tr2: TraceReader+Clone+'static, + Tr2::Batch: BatchReader+'static, + Tr2::Cursor: Cursor+'static, + Tr2::Val: Ord+Clone+Debug+'static, + Tr2::R: Semigroup, + R: Semigroup, + D: Data, + ROut: Semigroup, + I: IntoIterator, + L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static, + { + self.arrange_by_key().join_core_internal_unsafe(stream2, result) + } + } impl JoinCore for Arranged @@ -316,8 +388,28 @@ impl JoinCore for Arranged >::Output: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static { + L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static + { + let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::R, r2: &Tr2::R| { + let t = t.clone(); + let r = (r1.clone()).multiply(r2); + result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) + }; + self.join_core_internal_unsafe(other, result) + } + fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection + where + Tr2: TraceReader+Clone+'static, + Tr2::Batch: BatchReader+'static, + Tr2::Cursor: Cursor+'static, + Tr2::Val: Ord+Clone+Debug+'static, + Tr2::R: Semigroup, + D: Data, + ROut: Semigroup, + I: IntoIterator, + L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::R,&Tr2::R)->I+'static, + { // Rename traces for symmetry from here on out. let mut trace1 = self.trace.clone(); let mut trace2 = other.trace.clone(); @@ -488,8 +580,7 @@ impl JoinCore for Arranged while !todo1.is_empty() && fuel > 0 { todo1.front_mut().unwrap().work( output, - |k,v2,v1| result(k,v1,v2), - |r2,r1| (r1.clone()).multiply(r2), + |k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2), &mut fuel ); if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } @@ -500,8 +591,7 @@ impl JoinCore for Arranged while !todo2.is_empty() && fuel > 0 { todo2.front_mut().unwrap().work( output, - |k,v1,v2| result(k,v1,v2), - |r1,r2| (r1.clone()).multiply(r2), + |k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2), &mut fuel ); if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } @@ -594,7 +684,7 @@ where R3: Semigroup, C1: Cursor, C2: Cursor, - D: Ord+Clone+Data, + D: Clone+Data, { fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { Deferred { @@ -613,10 +703,10 @@ where !self.done } - /// Process keys until at least `limit` output tuples produced, or the work is exhausted. + /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>, mut logic: L, mut mult: M, fuel: &mut usize) - where I: IntoIterator, L: FnMut(&K, &V1, &V2)->I, M: FnMut(&R1,&R2)->R3 { + fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) + where I: IntoIterator, L: FnMut(&K, &V1, &V2, &T, &R1, &R2)->I { let meet = self.capability.time(); @@ -645,11 +735,12 @@ where assert_eq!(temp.len(), 0); // populate `temp` with the results in the best way we know how. - thinker.think(|v1,v2,t,r1,r2| - for result in logic(batch.key(batch_storage), v1, v2) { - temp.push(((result, t.clone()), mult(r1, r2))); + thinker.think(|v1,v2,t,r1,r2| { + let key = batch.key(batch_storage); + for (d, t, r) in logic(key, v1, v2, &t, r1, r2) { + temp.push(((d, t), r)); } - ); + }); // TODO: This consolidation is optional, and it may not be very // helpful. We might try harder to understand whether we From 8e0d0e6ccbd53705e5d6ef0e3f98ec79c89ab0e3 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi <1756620+uce@users.noreply.github.com> Date: Tue, 8 Jun 2021 21:48:12 +0200 Subject: [PATCH 6/6] fixup! half_join: add unsafe variant --- dogsdogsdogs/src/operators/half_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index dc1a87fa4..cf6982d0e 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -93,7 +93,7 @@ where let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::R, diff2: &Tr::R| { let diff = diff1.clone() * diff2.clone(); let dout = (output_func(k, v1, v2), time.clone()); - Some((dout, initial.clone(), diff)).into_iter() + Some((dout, initial.clone(), diff)) }; half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, output_func) }