From 5b5aff8e3db09117b7378f37e159279c1568d154 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 5 Oct 2021 10:02:12 -0400 Subject: [PATCH 1/2] allow halfjoin to yield tastefully --- dogsdogsdogs/src/operators/half_join.rs | 145 ++++++++++++++---------- 1 file changed, 88 insertions(+), 57 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index cf6982d0e..22d433886 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -95,7 +95,7 @@ where let dout = (output_func(k, v1, v2), time.clone()); Some((dout, initial.clone(), diff)) }; - half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, output_func) + half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, |timer, count| false, output_func) } /// An unsafe variant of `half_join` where the `output_func` closure takes @@ -117,11 +117,16 @@ where /// /// for each `((key, val2), time2, diff2)` present in `arrangement`, where /// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. -pub fn half_join_internal_unsafe( +/// +/// The `yield_function` allows the caller to indicate when the operator should +/// yield control, as a function of the elapsed time and the number of matched +/// records. +pub fn half_join_internal_unsafe( stream: &Collection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, + yield_function: Y, mut output_func: S, ) -> Collection where @@ -138,6 +143,7 @@ where CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, ROut: Monoid, + Y: Fn(std::time::Instant, usize) -> bool + 'static, I: IntoIterator, S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R, &Tr::R)-> I + 'static, { @@ -154,81 +160,106 @@ where // Stash for (time, diff) accumulation. let mut output_buffer = Vec::new(); - stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,_| move |input1, input2, output| { + stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| { - // drain the first input, stashing requests. - input1.for_each(|capability, data| { - data.swap(&mut buffer); - stash.entry(capability.retain()) - .or_insert(Vec::new()) - .extend(buffer.drain(..)) - }); + // Acquire an activator to reschedule the operator when it has unfinished work. + use timely::scheduling::Activator; + let activations = stream.scope().activations(); + let activator = Activator::new(&info.address[..], activations); - // Drain input batches; although we do not observe them, we want access to the input - // to observe the frontier and to drive scheduling. - input2.for_each(|_, _| { }); - if let Some(ref mut trace) = arrangement_trace { + move |input1, input2, output| { - for (capability, proposals) in stash.iter_mut() { + // drain the first input, stashing requests. + input1.for_each(|capability, data| { + data.swap(&mut buffer); + stash.entry(capability.retain()) + .or_insert(Vec::new()) + .extend(buffer.drain(..)) + }); - // defer requests at incomplete times. - // TODO: Verify this is correct for TOTAL ORDER. - if !input2.frontier.less_equal(capability.time()) { + // Drain input batches; although we do not observe them, we want access to the input + // to observe the frontier and to drive scheduling. + input2.for_each(|_, _| { }); - let mut session = output.session(capability); + // Local variables to track if and when we should exit early. + // The rough logic is that we fully process inputs and set their differences to zero, + // stopping at any point. We clean up all of the zeros in buffers that did any work, + // and reactivate at the end if the yield function still says so. + let mut yielded = false; + let timer = std::time::Instant::now(); + let mut work = 0; - // Sort requests by key for in-order cursor traversal. - consolidate_updates(proposals); + if let Some(ref mut trace) = arrangement_trace { - let (mut cursor, storage) = trace.cursor(); + for (capability, proposals) in stash.iter_mut() { - for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { - // Use TOTAL ORDER to allow the release of `time`. - if !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { - cursor.seek_key(&storage, &key); - if cursor.get_key(&storage) == Some(&key) { - while let Some(val2) = cursor.get_val(&storage) { - cursor.map_times(&storage, |t, d| { - if comparison(t, initial) { - output_buffer.push((t.join(time), d.clone())) - } - }); - consolidate(&mut output_buffer); - for (time, diff2) in output_buffer.drain(..) { - for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) { - session.give(dout); + // Avoid computation if we should already yield. + // TODO: Verify this is correct for TOTAL ORDER. + yielded = yielded || yield_function(timer, work); + if !yielded && !input2.frontier.less_equal(capability.time()) { + + let mut session = output.session(capability); + + // Sort requests by key for in-order cursor traversal. + consolidate_updates(proposals); + + let (mut cursor, storage) = trace.cursor(); + + // Process proposals one at a time, stopping if we should yield. + for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { + // Use TOTAL ORDER to allow the release of `time`. + yielded = yielded || yield_function(timer, work); + if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { + cursor.seek_key(&storage, &key); + if cursor.get_key(&storage) == Some(&key) { + while let Some(val2) = cursor.get_val(&storage) { + cursor.map_times(&storage, |t, d| { + if comparison(t, initial) { + output_buffer.push((t.join(time), d.clone())) + } + }); + consolidate(&mut output_buffer); + work += output_buffer.len(); + for (time, diff2) in output_buffer.drain(..) { + for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) { + session.give(dout); + } } + cursor.step_val(&storage); } - cursor.step_val(&storage); + cursor.rewind_vals(&storage); } - cursor.rewind_vals(&storage); + *diff1 = Tr::R::zero(); } - *diff1 = Tr::R::zero(); } - } - proposals.retain(|ptd| !ptd.2.is_zero()); + proposals.retain(|ptd| !ptd.2.is_zero()); + } } } - } - // drop fully processed capabilities. - stash.retain(|_,proposals| !proposals.is_empty()); + // If we yielded, re-activate the operator. + if yielded { + activator.activate(); + } - // The logical merging frontier depends on both input1 and stash. - let mut frontier = timely::progress::frontier::Antichain::new(); - for time in input1.frontier().frontier().iter() { - frontier.insert(frontier_func(time)); - } - for key in stash.keys() { - frontier.insert(frontier_func(key.time())); - } - arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); + // drop fully processed capabilities. + stash.retain(|_,proposals| !proposals.is_empty()); - if input1.frontier().is_empty() && stash.is_empty() { - arrangement_trace = None; - } + // The logical merging frontier depends on both input1 and stash. + let mut frontier = timely::progress::frontier::Antichain::new(); + for time in input1.frontier().frontier().iter() { + frontier.insert(frontier_func(time)); + } + for key in stash.keys() { + frontier.insert(frontier_func(key.time())); + } + arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); + if input1.frontier().is_empty() && stash.is_empty() { + arrangement_trace = None; + } + } }).as_collection() } From 4955c0870ae28b5a0b4fabd5034ff4315a828791 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 5 Oct 2021 12:39:13 -0400 Subject: [PATCH 2/2] improve explanation of choices --- dogsdogsdogs/src/operators/half_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 22d433886..94c5bda33 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -120,7 +120,8 @@ where /// /// The `yield_function` allows the caller to indicate when the operator should /// yield control, as a function of the elapsed time and the number of matched -/// records. +/// records. Note this is not the number of *output* records, owing mainly to +/// the number of matched records being easiest to record with low overhead. pub fn half_join_internal_unsafe( stream: &Collection, mut arrangement: Arranged, @@ -167,7 +168,6 @@ where let activations = stream.scope().activations(); let activator = Activator::new(&info.address[..], activations); - move |input1, input2, output| { // drain the first input, stashing requests.