diff --git a/dogsdogsdogs/examples/delta_query2.rs b/dogsdogsdogs/examples/delta_query2.rs new file mode 100644 index 000000000..7f747122b --- /dev/null +++ b/dogsdogsdogs/examples/delta_query2.rs @@ -0,0 +1,85 @@ +extern crate timely; +extern crate graph_map; +extern crate differential_dataflow; + +extern crate dogsdogsdogs; + +use timely::dataflow::Scope; +use timely::order::Product; +use timely::dataflow::operators::probe::Handle; +use timely::dataflow::operators::UnorderedInput; +use timely::dataflow::operators::Map; +use differential_dataflow::AsCollection; + +fn main() { + + timely::execute_from_args(std::env::args().skip(2), move |worker| { + + let mut probe = Handle::new(); + + let (mut i1, mut i2, c1, c2) = worker.dataflow::(|scope| { + + // Nested scope as `Product` doesn't refine `()`, because .. coherence. + scope.scoped("InnerScope", |inner| { + + use timely::dataflow::operators::unordered_input::UnorderedHandle; + + let ((input1, capability1), data1): ((UnorderedHandle, ((usize, usize), Product, isize)>, _), _) = inner.new_unordered_input(); + let ((input2, capability2), data2): ((UnorderedHandle, ((usize, usize), Product, isize)>, _), _) = inner.new_unordered_input(); + + let edges1 = data1.as_collection(); + let edges2 = data2.as_collection(); + + // Graph oriented both ways, indexed by key. + use differential_dataflow::operators::arrange::ArrangeByKey; + let forward1 = edges1.arrange_by_key(); + let forward2 = edges2.arrange_by_key(); + + // Grab the stream of changes. Stash the initial time as payload. + let changes1 = edges1.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); + let changes2 = edges2.inner.map(|((k,v),t,r)| ((k,v,t.clone()),t,r)).as_collection(); + + use dogsdogsdogs::operators::half_join; + + // pick a frontier that will not mislead TOTAL ORDER comparisons. + let closure = |time: &Product| Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1)); + + let path1 = + half_join( + &changes1, + forward2, + closure.clone(), + |t1,t2| t1.lt(t2), // This one ignores concurrent updates. + |key, val1, val2| (key.clone(), (val1.clone(), val2.clone())), + ); + + let path2 = + half_join( + &changes2, + forward1, + closure.clone(), + |t1,t2| t1.le(t2), // This one can "see" concurrent updates. + |key, val1, val2| (key.clone(), (val2.clone(), val1.clone())), + ); + + // Delay updates until the worked payload time. + // This should be at least the ignored update time. + path1.concat(&path2) + .inner.map(|(((k,v),t),_,r)| ((k,v),t,r)).as_collection() + .inspect(|x| println!("{:?}", x)) + .probe_with(&mut probe); + + (input1, input2, capability1, capability2) + }) + }); + + i1 + .session(c1.clone()) + .give(((5, 6), Product::new(0, 13), 1)); + + i2 + .session(c2.clone()) + .give(((5, 7), Product::new(11, 0), 1)); + + }).unwrap(); +} diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs new file mode 100644 index 000000000..4335e5a4f --- /dev/null +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -0,0 +1,182 @@ +//! Dataflow operator for delta joins over partially ordered timestamps. +//! +//! Given multiple streams of updates `(data, time, diff)` that are each +//! defined over the same partially ordered `time`, we want to form the +//! full cross-join of all relations (we will *later* apply some filters +//! and instead equijoin on keys). +//! +//! The "correct" output is the outer join of these triples, where +//! 1. The `data` entries are just tuple'd up together, +//! 2. The `time` entries are subjected to the lattice `join` operator, +//! 3. The `diff` entries are multiplied. +//! +//! One way to produce the correct output is to form independent dataflow +//! fragments for each input stream, such that each intended output is then +//! produced by exactly one of these input streams. +//! +//! There are several incorrect ways one might do this, but here is one way +//! that I hope is not incorrect: +//! +//! Each input stream of updates is joined with each other input collection, +//! where each input update is matched against each other input update that +//! has a `time` that is less-than the input update's `time`, *UNDER A TOTAL +//! ORDER ON `time`*. The output are the `(data, time, diff)` entries that +//! follow the rules above, except that we additionally preserve the input's +//! initial `time` as well, for use in subsequent joins with the other input +//! collections. +//! +//! There are some caveats about ties, and we should treat each `time` for +//! each input as occuring at distinct times, one after the other (so that +//! ties are resolved by the index of the input). There is also the matter +//! of logical compaction, which should not be done in a way that prevents +//! the correct determination of the total order comparison. + +use std::collections::HashMap; + +use timely::dataflow::Scope; +use timely::dataflow::channels::pact::{Pipeline, Exchange}; +use timely::dataflow::operators::Operator; +use timely::progress::Antichain; + +use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; +use differential_dataflow::difference::{Monoid, Semigroup}; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::Arranged; +use differential_dataflow::trace::{Cursor, TraceReader, BatchReader}; +use differential_dataflow::consolidation::{consolidate, consolidate_updates}; + +/// A binary equijoin that responds to updates on only its first input. +/// +/// This operator responds to inputs of the form +/// +/// ```ignore +/// ((key, val1, time1), initial_time, diff1) +/// ``` +/// +/// where `initial_time` is less or equal to `time`, and produces as output +/// +/// ```ignore +/// ((key, (val1, val2), lub(time1, time2)), initial_time, diff1 * diff2) +/// ``` +/// +/// for each `((key, val2), time2, diff2)` present in `arrangement, where +/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. +/// This last constraint is important to ensure that we correctly produce +/// all pairs of output updates across multiple `half_join` operators. +/// +/// Notice that the time is hoisted up into data. The expectation is that +/// once out of the dataflow, the updates will be `delay`d to the times +/// specified in the payloads. +pub fn half_join( + stream: &Collection, + mut arrangement: Arranged, + frontier_func: FF, + comparison: CF, + mut output_func: S, +) -> Collection +where + G: Scope, + G::Timestamp: Lattice, + V: ExchangeData, + Tr: TraceReader+Clone+'static, + Tr::Key: Ord+Hashable+ExchangeData, + Tr::Val: Clone, + Tr::Batch: BatchReader, + Tr::Cursor: Cursor, + Tr::R: Monoid+ExchangeData, + FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, + CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, + DOut: Clone+'static, + Tr::R: std::ops::Mul, + S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static, +{ + // No need to block physical merging for this operator. + arrangement.trace.set_physical_compaction(Antichain::new().borrow()); + let mut arrangement_trace = Some(arrangement.trace); + let arrangement_stream = arrangement.stream; + + let mut stash = HashMap::new(); + let mut buffer = Vec::new(); + + let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::R)| (update.0).0.hashed().into()); + + // Stash for (time, diff) accumulation. + let mut output_buffer = Vec::new(); + + stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,_| move |input1, input2, output| { + + // drain the first input, stashing requests. + input1.for_each(|capability, data| { + data.swap(&mut buffer); + stash.entry(capability.retain()) + .or_insert(Vec::new()) + .extend(buffer.drain(..)) + }); + + // Drain input batches; although we do not observe them, we want access to the input + // to observe the frontier and to drive scheduling. + input2.for_each(|_, _| { }); + + if let Some(ref mut trace) = arrangement_trace { + + for (capability, proposals) in stash.iter_mut() { + + // defer requests at incomplete times. + // TODO: Verify this is correct for TOTAL ORDER. + if !input2.frontier.less_equal(capability.time()) { + + let mut session = output.session(capability); + + // Sort requests by key for in-order cursor traversal. + consolidate_updates(proposals); + + let (mut cursor, storage) = trace.cursor(); + + for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff) in proposals.iter_mut() { + // Use TOTAL ORDER to allow the release of `time`. + if !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { + cursor.seek_key(&storage, &key); + if cursor.get_key(&storage) == Some(&key) { + while let Some(val2) = cursor.get_val(&storage) { + cursor.map_times(&storage, |t, d| { + if comparison(t, initial) { + output_buffer.push((t.join(time), d.clone())) + } + }); + consolidate(&mut output_buffer); + for (time, count) in output_buffer.drain(..) { + let dout = output_func(key, val1, val2); + session.give(((dout, time), initial.clone(), count * diff.clone())); + } + cursor.step_val(&storage); + } + cursor.rewind_vals(&storage); + } + *diff = Tr::R::zero(); + } + } + + proposals.retain(|ptd| !ptd.2.is_zero()); + } + } + } + + // drop fully processed capabilities. + stash.retain(|_,proposals| !proposals.is_empty()); + + // The logical merging frontier depends on both input1 and stash. + let mut frontier = timely::progress::frontier::Antichain::new(); + for time in input1.frontier().frontier().iter() { + frontier.insert(frontier_func(time)); + } + for key in stash.keys() { + frontier.insert(frontier_func(key.time())); + } + arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); + + if input1.frontier().is_empty() && stash.is_empty() { + arrangement_trace = None; + } + + }).as_collection() +} diff --git a/dogsdogsdogs/src/operators/mod.rs b/dogsdogsdogs/src/operators/mod.rs index 3830625b7..1a9f30415 100644 --- a/dogsdogsdogs/src/operators/mod.rs +++ b/dogsdogsdogs/src/operators/mod.rs @@ -1,9 +1,11 @@ +pub mod half_join; pub mod lookup_map; pub mod count; pub mod propose; pub mod validate; +pub use self::half_join::half_join; pub use self::lookup_map::lookup_map; pub use self::count::count; pub use self::propose::{propose, propose_distinct};