From f09a60f8c15dcadaf00a08f804954f0f8396b0fc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 27 Nov 2023 23:13:20 -0500 Subject: [PATCH] Update halfjoin to new idioms --- dogsdogsdogs/src/operators/half_join.rs | 48 ++++++++++++++----------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index b713d07d1..63cc74ccf 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -31,7 +31,10 @@ //! of logical compaction, which should not be done in a way that prevents //! the correct determination of the total order comparison. +use std::borrow::Borrow; use std::collections::HashMap; +use std::ops::Mul; + use timely::dataflow::Scope; use timely::dataflow::channels::pact::{Pipeline, Exchange}; @@ -67,28 +70,30 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates}; /// Notice that the time is hoisted up into data. The expectation is that /// once out of the "delta flow region", the updates will be `delay`d to the /// times specified in the payloads. -pub fn half_join( - stream: &Collection, +pub fn half_join( + stream: &Collection, arrangement: Arranged, frontier_func: FF, comparison: CF, mut output_func: S, -) -> Collection +) -> Collection>::Output> where G: Scope, G::Timestamp: Lattice, + K: Ord + Hashable + ExchangeData + Borrow, V: ExchangeData, + R: ExchangeData + Monoid, Tr: TraceReader+Clone+'static, - Tr::Key: Ord+Hashable+ExchangeData, - Tr::Val: Clone, - Tr::Diff: Monoid+ExchangeData, + Tr::Key: Eq, + Tr::Diff: Semigroup, + R: Mul, + >::Output: Semigroup, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, - Tr::Diff: std::ops::Mul, - S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static, + S: FnMut(&K, &V, &Tr::Val)->DOut+'static, { - let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::Diff, diff2: &Tr::Diff| { + let output_func = move |k: &K, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| { let diff = diff1.clone() * diff2.clone(); let dout = (output_func(k, v1, v2), time.clone()); Some((dout, initial.clone(), diff)) @@ -120,8 +125,8 @@ where /// yield control, as a function of the elapsed time and the number of matched /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. -pub fn half_join_internal_unsafe( - stream: &Collection, +pub fn half_join_internal_unsafe( + stream: &Collection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, @@ -131,18 +136,19 @@ pub fn half_join_internal_unsafe( where G: Scope, G::Timestamp: Lattice, + K: Ord + Hashable + ExchangeData + std::borrow::Borrow, V: ExchangeData, + R: ExchangeData + Monoid, Tr: TraceReader+Clone+'static, - Tr::Key: Ord+Hashable+ExchangeData, - Tr::Val: Clone, - Tr::Diff: Monoid+ExchangeData, + Tr::Key: Eq, + Tr::Diff: Semigroup, FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, - ROut: Monoid, + ROut: Semigroup, Y: Fn(std::time::Instant, usize) -> bool + 'static, I: IntoIterator, - S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::Diff, &Tr::Diff)-> I + 'static, + S: FnMut(&K, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -152,7 +158,7 @@ where let mut stash = HashMap::new(); let mut buffer = Vec::new(); - let exchange = Exchange::new(move |update: &((Tr::Key, V, G::Timestamp),G::Timestamp,Tr::Diff)| (update.0).0.hashed().into()); + let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); // Stash for (time, diff) accumulation. let mut output_buffer = Vec::new(); @@ -210,8 +216,8 @@ where // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, work); if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { - cursor.seek_key(&storage, &key); - if cursor.get_key(&storage) == Some(&key) { + cursor.seek_key(&storage, key.borrow()); + if cursor.get_key(&storage) == Some(key.borrow()) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { @@ -221,7 +227,7 @@ where consolidate(&mut output_buffer); work += output_buffer.len(); for (time, diff2) in output_buffer.drain(..) { - for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) { + for dout in output_func(&key, val1, val2, initial, &time, &diff1, &diff2) { session.give(dout); } } @@ -229,7 +235,7 @@ where } cursor.rewind_vals(&storage); } - *diff1 = Tr::Diff::zero(); + *diff1 = R::zero(); } }