From 35a351664a9f07e2ef26fee81fa81c5d1f4a6790 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 23 Mar 2021 12:04:08 -0400 Subject: [PATCH 1/2] implement half-map operator --- dogsdogsdogs/examples/delta_query2.rs | 171 ++++++++++++++++++++++ dogsdogsdogs/src/operators/half_join.rs | 182 ++++++++++++++++++++++++ dogsdogsdogs/src/operators/mod.rs | 2 + 3 files changed, 355 insertions(+) create mode 100644 dogsdogsdogs/examples/delta_query2.rs create mode 100644 dogsdogsdogs/src/operators/half_join.rs diff --git a/dogsdogsdogs/examples/delta_query2.rs b/dogsdogsdogs/examples/delta_query2.rs new file mode 100644 index 000000000..85ce80d92 --- /dev/null +++ b/dogsdogsdogs/examples/delta_query2.rs @@ -0,0 +1,171 @@ +extern crate timely; +extern crate graph_map; +extern crate differential_dataflow; + +#[macro_use] +extern crate abomonation_derive; +extern crate abomonation; + +extern crate dogsdogsdogs; + +use timely::dataflow::operators::probe::Handle; +use timely::dataflow::operators::UnorderedInput; +use timely::dataflow::operators::Map; +use differential_dataflow::AsCollection; + +fn main() { + + use pair::Pair; + + timely::execute_from_args(std::env::args().skip(2), move |worker| { + + let mut probe = Handle::new(); + + let (mut i1, mut i2, c1, c2) = worker.dataflow::,_,_>(|scope| { + + use timely::dataflow::operators::unordered_input::UnorderedHandle; + + let ((input1, capability1), data1): ((UnorderedHandle, ((usize, usize), Pair, isize)>, _), _) = scope.new_unordered_input(); + let ((input2, capability2), data2): ((UnorderedHandle, ((usize, usize), Pair, isize)>, _), _) = scope.new_unordered_input(); + + let edges1 = data1.as_collection(); + let edges2 = data2.as_collection(); + + // Graph oriented both ways, indexed by key. + use differential_dataflow::operators::arrange::ArrangeByKey; + let forward1 = edges1.arrange_by_key(); + let forward2 = edges2.arrange_by_key(); + + // Grab the stream of changes. Stash the initial time as payload. + let changes1 = edges1.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); + let changes2 = edges2.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); + + use dogsdogsdogs::operators::half_join; + + // pick a frontier that will not mislead TOTAL ORDER comparisons. + let closure = |time: &Pair| Pair::new(time.first.saturating_sub(1), time.second.saturating_sub(1)); + + let path1 = + half_join( + &changes1, + forward2, + closure.clone(), + |t1,t2| t1.lt(t2), // This one ignores concurrent updates. + |key, val1, val2| (key.clone(), (val1.clone(), val2.clone())), + ); + + let path2 = + half_join( + &changes2, + forward1, + closure.clone(), + |t1,t2| t1.le(t2), // This one can "see" concurrent updates. + |key, val1, val2| (key.clone(), (val2.clone(), val1.clone())), + ); + + // Delay updates until the worked payload time. + // This should be at least the ignored update time. + path1.concat(&path2) + .inner.map(|(((k,v),t),_,r)| ((k,v),t,r)).as_collection() + .inspect(|x| println!("{:?}", x)) + .probe_with(&mut probe); + + (input1, input2, capability1, capability2) + }); + + i1 + .session(c1.clone()) + .give(((5, 6), Pair::new(0, 13), 1)); + + i2 + .session(c2.clone()) + .give(((5, 7), Pair::new(11, 0), 1)); + + }).unwrap(); +} + + +/// This module contains a definition of a new timestamp time, a "pair" or product. +/// +/// This is a minimal self-contained implementation, in that it doesn't borrow anything +/// from the rest of the library other than the traits it needs to implement. With this +/// type and its implementations, you can use it as a timestamp type. +mod pair { + + /// A pair of timestamps, partially ordered by the product order. + #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation)] + pub struct Pair { + pub first: S, + pub second: T, + } + + impl Pair { + /// Create a new pair. + pub fn new(first: S, second: T) -> Self { + Pair { first, second } + } + } + + // Implement timely dataflow's `PartialOrder` trait. + use timely::order::PartialOrder; + impl PartialOrder for Pair { + fn less_equal(&self, other: &Self) -> bool { + self.first.less_equal(&other.first) && self.second.less_equal(&other.second) + } + } + + use timely::progress::timestamp::Refines; + impl Refines<()> for Pair { + fn to_inner(_outer: ()) -> Self { Self::minimum() } + fn to_outer(self) -> () { () } + fn summarize(_summary: ::Summary) -> () { () } + } + + // Implement timely dataflow's `PathSummary` trait. + // This is preparation for the `Timestamp` implementation below. + use timely::progress::PathSummary; + + impl PathSummary> for () { + fn results_in(&self, timestamp: &Pair) -> Option> { + Some(timestamp.clone()) + } + fn followed_by(&self, other: &Self) -> Option { + Some(other.clone()) + } + } + + // Implement timely dataflow's `Timestamp` trait. + use timely::progress::Timestamp; + impl Timestamp for Pair { + fn minimum() -> Self { Pair { first: S::minimum(), second: T::minimum() }} + type Summary = (); + } + + // Implement differential dataflow's `Lattice` trait. + // This extends the `PartialOrder` implementation with additional structure. + use differential_dataflow::lattice::Lattice; + impl Lattice for Pair { + fn join(&self, other: &Self) -> Self { + Pair { + first: self.first.join(&other.first), + second: self.second.join(&other.second), + } + } + fn meet(&self, other: &Self) -> Self { + Pair { + first: self.first.meet(&other.first), + second: self.second.meet(&other.second), + } + } + } + + use std::fmt::{Formatter, Error, Debug}; + + /// Debug implementation to avoid seeing fully qualified path names. + impl Debug for Pair { + fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { + f.write_str(&format!("({:?}, {:?})", self.first, self.second)) + } + } + +} diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs new file mode 100644 index 000000000..4335e5a4f --- /dev/null +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -0,0 +1,182 @@ +//! Dataflow operator for delta joins over partially ordered timestamps. +//! +//! Given multiple streams of updates `(data, time, diff)` that are each +//! defined over the same partially ordered `time`, we want to form the +//! full cross-join of all relations (we will *later* apply some filters +//! and instead equijoin on keys). +//! +//! The "correct" output is the outer join of these triples, where +//! 1. The `data` entries are just tuple'd up together, +//! 2. The `time` entries are subjected to the lattice `join` operator, +//! 3. The `diff` entries are multiplied. +//! +//! One way to produce the correct output is to form independent dataflow +//! fragments for each input stream, such that each intended output is then +//! produced by exactly one of these input streams. +//! +//! There are several incorrect ways one might do this, but here is one way +//! that I hope is not incorrect: +//! +//! Each input stream of updates is joined with each other input collection, +//! where each input update is matched against each other input update that +//! has a `time` that is less-than the input update's `time`, *UNDER A TOTAL +//! ORDER ON `time`*. The output are the `(data, time, diff)` entries that +//! follow the rules above, except that we additionally preserve the input's +//! initial `time` as well, for use in subsequent joins with the other input +//! collections. +//! +//! There are some caveats about ties, and we should treat each `time` for +//! each input as occuring at distinct times, one after the other (so that +//! ties are resolved by the index of the input). There is also the matter +//! of logical compaction, which should not be done in a way that prevents +//! the correct determination of the total order comparison. + +use std::collections::HashMap; + +use timely::dataflow::Scope; +use timely::dataflow::channels::pact::{Pipeline, Exchange}; +use timely::dataflow::operators::Operator; +use timely::progress::Antichain; + +use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; +use differential_dataflow::difference::{Monoid, Semigroup}; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::Arranged; +use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::consolidation::{consolidate, consolidate_updates}; + +/// A binary equijoin that responds to updates on only its first input. +/// +/// This operator responds to inputs of the form +/// +/// ```ignore +/// ((key, val1, time1), initial_time, diff1) +/// ``` +/// +/// where `initial_time` is less or equal to `time`, and produces as output +/// +/// ```ignore +/// ((key, (val1, val2), lub(time1, time2)), initial_time, diff1 * diff2) +/// ``` +/// +/// for each `((key, val2), time2, diff2)` present in `arrangement, where +/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. +/// This last constraint is important to ensure that we correctly produce +/// all pairs of output updates across multiple `half_join` operators. +/// +/// Notice that the time is hoisted up into data. The expectation is that +/// once out of the dataflow, the updates will be `delay`d to the times +/// specified in the payloads. +pub fn half_join( + stream: &Collection, + mut arrangement: Arranged, + frontier_func: FF, + comparison: CF, + mut output_func: S, +) -> Collection +where + G: Scope, + G::Timestamp: Lattice, + V: ExchangeData, + Tr: TraceReader+Clone+'static, + Tr::Key: Ord+Hashable+ExchangeData, + Tr::Val: Clone, + Tr::Batch: BatchReader, + Tr::Cursor: Cursor, + Tr::R: Monoid+ExchangeData, + FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, + CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, + DOut: Clone+'static, + Tr::R: std::ops::Mul, + S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static, +{ + // No need to block physical merging for this operator. + arrangement.trace.set_physical_compaction(Antichain::new().borrow()); + let mut arrangement_trace = Some(arrangement.trace); + let arrangement_stream = arrangement.stream; + + let mut stash = HashMap::new(); + let mut buffer = Vec::new(); + + let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::R)| (update.0).0.hashed().into()); + + // Stash for (time, diff) accumulation. + let mut output_buffer = Vec::new(); + + stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,_| move |input1, input2, output| { + + // drain the first input, stashing requests. + input1.for_each(|capability, data| { + data.swap(&mut buffer); + stash.entry(capability.retain()) + .or_insert(Vec::new()) + .extend(buffer.drain(..)) + }); + + // Drain input batches; although we do not observe them, we want access to the input + // to observe the frontier and to drive scheduling. + input2.for_each(|_, _| { }); + + if let Some(ref mut trace) = arrangement_trace { + + for (capability, proposals) in stash.iter_mut() { + + // defer requests at incomplete times. + // TODO: Verify this is correct for TOTAL ORDER. + if !input2.frontier.less_equal(capability.time()) { + + let mut session = output.session(capability); + + // Sort requests by key for in-order cursor traversal. + consolidate_updates(proposals); + + let (mut cursor, storage) = trace.cursor(); + + for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff) in proposals.iter_mut() { + // Use TOTAL ORDER to allow the release of `time`. + if !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { + cursor.seek_key(&storage, &key); + if cursor.get_key(&storage) == Some(&key) { + while let Some(val2) = cursor.get_val(&storage) { + cursor.map_times(&storage, |t, d| { + if comparison(t, initial) { + output_buffer.push((t.join(time), d.clone())) + } + }); + consolidate(&mut output_buffer); + for (time, count) in output_buffer.drain(..) { + let dout = output_func(key, val1, val2); + session.give(((dout, time), initial.clone(), count * diff.clone())); + } + cursor.step_val(&storage); + } + cursor.rewind_vals(&storage); + } + *diff = Tr::R::zero(); + } + } + + proposals.retain(|ptd| !ptd.2.is_zero()); + } + } + } + + // drop fully processed capabilities. + stash.retain(|_,proposals| !proposals.is_empty()); + + // The logical merging frontier depends on both input1 and stash. + let mut frontier = timely::progress::frontier::Antichain::new(); + for time in input1.frontier().frontier().iter() { + frontier.insert(frontier_func(time)); + } + for key in stash.keys() { + frontier.insert(frontier_func(key.time())); + } + arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); + + if input1.frontier().is_empty() && stash.is_empty() { + arrangement_trace = None; + } + + }).as_collection() +} diff --git a/dogsdogsdogs/src/operators/mod.rs b/dogsdogsdogs/src/operators/mod.rs index 3830625b7..1a9f30415 100644 --- a/dogsdogsdogs/src/operators/mod.rs +++ b/dogsdogsdogs/src/operators/mod.rs @@ -1,9 +1,11 @@ +pub mod half_join; pub mod lookup_map; pub mod count; pub mod propose; pub mod validate; +pub use self::half_join::half_join; pub use self::lookup_map::lookup_map; pub use self::count::count; pub use self::propose::{propose, propose_distinct}; From 64c73c4c12b9f7a1de3f77cc093a535fc9c20a52 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 25 Mar 2021 15:04:45 -0400 Subject: [PATCH 2/2] simplify example to use Product --- dogsdogsdogs/examples/delta_query2.rs | 180 +++++++------------------- 1 file changed, 47 insertions(+), 133 deletions(-) diff --git a/dogsdogsdogs/examples/delta_query2.rs b/dogsdogsdogs/examples/delta_query2.rs index 85ce80d92..7f747122b 100644 --- a/dogsdogsdogs/examples/delta_query2.rs +++ b/dogsdogsdogs/examples/delta_query2.rs @@ -2,12 +2,10 @@ extern crate timely; extern crate graph_map; extern crate differential_dataflow; -#[macro_use] -extern crate abomonation_derive; -extern crate abomonation; - extern crate dogsdogsdogs; +use timely::dataflow::Scope; +use timely::order::Product; use timely::dataflow::operators::probe::Handle; use timely::dataflow::operators::UnorderedInput; use timely::dataflow::operators::Map; @@ -15,157 +13,73 @@ use differential_dataflow::AsCollection; fn main() { - use pair::Pair; - timely::execute_from_args(std::env::args().skip(2), move |worker| { let mut probe = Handle::new(); - let (mut i1, mut i2, c1, c2) = worker.dataflow::,_,_>(|scope| { + let (mut i1, mut i2, c1, c2) = worker.dataflow::(|scope| { - use timely::dataflow::operators::unordered_input::UnorderedHandle; + // Nested scope as `Product` doesn't refine `()`, because .. coherence. + scope.scoped("InnerScope", |inner| { - let ((input1, capability1), data1): ((UnorderedHandle, ((usize, usize), Pair, isize)>, _), _) = scope.new_unordered_input(); - let ((input2, capability2), data2): ((UnorderedHandle, ((usize, usize), Pair, isize)>, _), _) = scope.new_unordered_input(); + use timely::dataflow::operators::unordered_input::UnorderedHandle; - let edges1 = data1.as_collection(); - let edges2 = data2.as_collection(); + let ((input1, capability1), data1): ((UnorderedHandle, ((usize, usize), Product, isize)>, _), _) = inner.new_unordered_input(); + let ((input2, capability2), data2): ((UnorderedHandle, ((usize, usize), Product, isize)>, _), _) = inner.new_unordered_input(); - // Graph oriented both ways, indexed by key. - use differential_dataflow::operators::arrange::ArrangeByKey; - let forward1 = edges1.arrange_by_key(); - let forward2 = edges2.arrange_by_key(); + let edges1 = data1.as_collection(); + let edges2 = data2.as_collection(); - // Grab the stream of changes. Stash the initial time as payload. - let changes1 = edges1.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); - let changes2 = edges2.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); + // Graph oriented both ways, indexed by key. + use differential_dataflow::operators::arrange::ArrangeByKey; + let forward1 = edges1.arrange_by_key(); + let forward2 = edges2.arrange_by_key(); - use dogsdogsdogs::operators::half_join; + // Grab the stream of changes. Stash the initial time as payload. + let changes1 = edges1.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); + let changes2 = edges2.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); - // pick a frontier that will not mislead TOTAL ORDER comparisons. - let closure = |time: &Pair| Pair::new(time.first.saturating_sub(1), time.second.saturating_sub(1)); + use dogsdogsdogs::operators::half_join; - let path1 = - half_join( - &changes1, - forward2, - closure.clone(), - |t1,t2| t1.lt(t2), // This one ignores concurrent updates. - |key, val1, val2| (key.clone(), (val1.clone(), val2.clone())), - ); + // pick a frontier that will not mislead TOTAL ORDER comparisons. + let closure = |time: &Product| Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1)); - let path2 = - half_join( - &changes2, - forward1, - closure.clone(), - |t1,t2| t1.le(t2), // This one can "see" concurrent updates. - |key, val1, val2| (key.clone(), (val2.clone(), val1.clone())), - ); + let path1 = + half_join( + &changes1, + forward2, + closure.clone(), + |t1,t2| t1.lt(t2), // This one ignores concurrent updates. + |key, val1, val2| (key.clone(), (val1.clone(), val2.clone())), + ); - // Delay updates until the worked payload time. - // This should be at least the ignored update time. - path1.concat(&path2) - .inner.map(|(((k,v),t),_,r)| ((k,v),t,r)).as_collection() - .inspect(|x| println!("{:?}", x)) - .probe_with(&mut probe); + let path2 = + half_join( + &changes2, + forward1, + closure.clone(), + |t1,t2| t1.le(t2), // This one can "see" concurrent updates. + |key, val1, val2| (key.clone(), (val2.clone(), val1.clone())), + ); - (input1, input2, capability1, capability2) + // Delay updates until the worked payload time. + // This should be at least the ignored update time. + path1.concat(&path2) + .inner.map(|(((k,v),t),_,r)| ((k,v),t,r)).as_collection() + .inspect(|x| println!("{:?}", x)) + .probe_with(&mut probe); + + (input1, input2, capability1, capability2) + }) }); i1 .session(c1.clone()) - .give(((5, 6), Pair::new(0, 13), 1)); + .give(((5, 6), Product::new(0, 13), 1)); i2 .session(c2.clone()) - .give(((5, 7), Pair::new(11, 0), 1)); + .give(((5, 7), Product::new(11, 0), 1)); }).unwrap(); } - - -/// This module contains a definition of a new timestamp time, a "pair" or product. -/// -/// This is a minimal self-contained implementation, in that it doesn't borrow anything -/// from the rest of the library other than the traits it needs to implement. With this -/// type and its implementations, you can use it as a timestamp type. -mod pair { - - /// A pair of timestamps, partially ordered by the product order. - #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation)] - pub struct Pair { - pub first: S, - pub second: T, - } - - impl Pair { - /// Create a new pair. - pub fn new(first: S, second: T) -> Self { - Pair { first, second } - } - } - - // Implement timely dataflow's `PartialOrder` trait. - use timely::order::PartialOrder; - impl PartialOrder for Pair { - fn less_equal(&self, other: &Self) -> bool { - self.first.less_equal(&other.first) && self.second.less_equal(&other.second) - } - } - - use timely::progress::timestamp::Refines; - impl Refines<()> for Pair { - fn to_inner(_outer: ()) -> Self { Self::minimum() } - fn to_outer(self) -> () { () } - fn summarize(_summary: ::Summary) -> () { () } - } - - // Implement timely dataflow's `PathSummary` trait. - // This is preparation for the `Timestamp` implementation below. - use timely::progress::PathSummary; - - impl PathSummary> for () { - fn results_in(&self, timestamp: &Pair) -> Option> { - Some(timestamp.clone()) - } - fn followed_by(&self, other: &Self) -> Option { - Some(other.clone()) - } - } - - // Implement timely dataflow's `Timestamp` trait. - use timely::progress::Timestamp; - impl Timestamp for Pair { - fn minimum() -> Self { Pair { first: S::minimum(), second: T::minimum() }} - type Summary = (); - } - - // Implement differential dataflow's `Lattice` trait. - // This extends the `PartialOrder` implementation with additional structure. - use differential_dataflow::lattice::Lattice; - impl Lattice for Pair { - fn join(&self, other: &Self) -> Self { - Pair { - first: self.first.join(&other.first), - second: self.second.join(&other.second), - } - } - fn meet(&self, other: &Self) -> Self { - Pair { - first: self.first.meet(&other.first), - second: self.second.meet(&other.second), - } - } - } - - use std::fmt::{Formatter, Error, Debug}; - - /// Debug implementation to avoid seeing fully qualified path names. - impl Debug for Pair { - fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { - f.write_str(&format!("({:?}, {:?})", self.first, self.second)) - } - } - -}