From e153706717cade21d823e84a324e59aa2ee9be58 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 25 Jan 2024 17:17:24 -0500 Subject: [PATCH] dogs^3 compaction improvement (#457) * Allow frontier function to populate antichain * Clean up dependencies --- dogsdogsdogs/examples/delta_query2.rs | 8 +++++--- dogsdogsdogs/src/operators/count.rs | 1 - dogsdogsdogs/src/operators/half_join.rs | 10 +++++----- dogsdogsdogs/src/operators/lookup_map.rs | 1 - dogsdogsdogs/src/operators/propose.rs | 1 - dogsdogsdogs/src/operators/validate.rs | 1 - 6 files changed, 10 insertions(+), 12 deletions(-) diff --git a/dogsdogsdogs/examples/delta_query2.rs b/dogsdogsdogs/examples/delta_query2.rs index 112cc6fdd..b1d861177 100644 --- a/dogsdogsdogs/examples/delta_query2.rs +++ b/dogsdogsdogs/examples/delta_query2.rs @@ -36,13 +36,15 @@ fn main() { use dogsdogsdogs::operators::half_join; // pick a frontier that will not mislead TOTAL ORDER comparisons. - let closure = |time: &Product| Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1)); + let closure = |time: &Product, antichain: &mut timely::progress::Antichain>| { + antichain.insert(Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1))); + }; let path1 = half_join( &changes1, forward2, - closure.clone(), + closure, |t1,t2| t1.lt(t2), // This one ignores concurrent updates. |key, val1, val2| (key.clone(), (val1.clone(), val2.clone())), ); @@ -51,7 +53,7 @@ fn main() { half_join( &changes2, forward1, - closure.clone(), + closure, |t1,t2| t1.le(t2), // This one can "see" concurrent updates. |key, val1, val2| (key.clone(), (val2.clone(), val1.clone())), ); diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 8382d631c..68c39f1fa 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -2,7 +2,6 @@ use timely::dataflow::Scope; use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; -use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 9d1f6114f..283284525 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -83,7 +83,7 @@ where Tr: TraceReader+Clone+'static, R: Mul, >::Output: Semigroup, - FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, + FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static, @@ -134,7 +134,7 @@ where V: ExchangeData, R: ExchangeData + Monoid, Tr: TraceReader+Clone+'static, - FF: Fn(&G::Timestamp) -> G::Timestamp + 'static, + FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, ROut: Semigroup, @@ -281,10 +281,10 @@ where // The logical merging frontier depends on both input1 and stash. let mut frontier = timely::progress::frontier::Antichain::new(); for time in input1.frontier().frontier().iter() { - frontier.insert(frontier_func(time)); + frontier_func(time, &mut frontier); } - for key in stash.keys() { - frontier.insert(frontier_func(key.time())); + for time in stash.keys() { + frontier_func(time, &mut frontier); } arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index a069c9d23..e8ab279ff 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -8,7 +8,6 @@ use timely::progress::Antichain; use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid}; -use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index cca4ee975..164e42987 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -2,7 +2,6 @@ use timely::dataflow::Scope; use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Monoid, Multiply}; -use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; use differential_dataflow::trace::cursor::MyTrait; diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index ec347b386..f1f0c273f 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -4,7 +4,6 @@ use timely::dataflow::Scope; use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Monoid, Multiply}; -use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader;