From e0706171551086ca4d597d8a0441455a123e8139 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 22 Apr 2024 21:54:32 -0400 Subject: [PATCH 1/2] Allow joins to produce arbitrary containers Add a `join_traces_core` function that allows the caller to specify the output stream container type. The existing `join_traces` function forces it to be vectors, by virtue of wrapping the output in a collection. Signed-off-by: Moritz Hoffmann --- src/operators/join.rs | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/src/operators/join.rs b/src/operators/join.rs index a31e682a5..c516d8983 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -5,10 +5,11 @@ //! + (b * c), and if this is not equal to the former term, little is known about the actual output. use std::cmp::Ordering; +use timely::container::{PushContainer, PushInto}; use timely::order::PartialOrder; use timely::progress::Timestamp; -use timely::dataflow::Scope; -use timely::dataflow::operators::generic::{Operator, OutputHandle}; +use timely::dataflow::{Scope, StreamCore}; +use timely::dataflow::operators::generic::{Operator, OutputHandleCore}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; use timely::dataflow::channels::pushers::tee::Tee; @@ -322,8 +323,34 @@ where /// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic /// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. /// +/// This implementation forces its output to use a [`Collection`]-compatible container. +/// /// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. -pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> Collection +pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, result: L) -> Collection + where + G: Scope, + T1: TraceReader+Clone+'static, + T2: for<'a> TraceReader=T1::Key<'a>, Time=T1::Time>+Clone+'static, + D: Data, + R: Semigroup, + I: IntoIterator, + L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, +{ + join_traces_core(arranged1, arranged2, result).as_collection() +} + +/// An equijoin of two traces, sharing a common key type. +/// +/// This method exists to provide join functionality without opinions on the specific input types, keys and values, +/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and +/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic +/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. +/// +/// The implementation produces a caller-specified container, with the requirement that the container +/// can absorb `(D, G::Timestamp, R)` triples. +/// +/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. +pub fn join_traces_core(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore where G: Scope, T1: TraceReader+Clone+'static, @@ -332,6 +359,8 @@ where R: Semigroup, I: IntoIterator, L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, + C: PushContainer, + (D, G::Timestamp, R): PushInto, { // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); @@ -565,7 +594,6 @@ where } } }) - .as_collection() } @@ -617,10 +645,12 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>>, mut logic: L, fuel: &mut usize) - where + fn work(&mut self, output: &mut OutputHandleCore>, mut logic: L, fuel: &mut usize) + where I: IntoIterator, L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, + C: PushContainer, + (D, T, R): PushInto, { let meet = self.capability.time(); From 2efd037ac548459841e71bbe5053bc5747249bc7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 23 Apr 2024 10:37:51 -0400 Subject: [PATCH 2/2] Remove `join_traces_core` Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 1 + src/operators/join.rs | 32 ++++++---------------------- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index bffc2bdaa..9a5d926c0 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -427,6 +427,7 @@ where { use crate::operators::join::join_traces; join_traces(self, other, result) + .as_collection() } } diff --git a/src/operators/join.rs b/src/operators/join.rs index c516d8983..311f06802 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -15,7 +15,7 @@ use timely::dataflow::operators::Capability; use timely::dataflow::channels::pushers::tee::Tee; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection, AsCollection}; +use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf}; @@ -316,29 +316,6 @@ where } } -/// An equijoin of two traces, sharing a common key type. -/// -/// This method exists to provide join functionality without opinions on the specific input types, keys and values, -/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and -/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic -/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. -/// -/// This implementation forces its output to use a [`Collection`]-compatible container. -/// -/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. -pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, result: L) -> Collection - where - G: Scope, - T1: TraceReader+Clone+'static, - T2: for<'a> TraceReader=T1::Key<'a>, Time=T1::Time>+Clone+'static, - D: Data, - R: Semigroup, - I: IntoIterator, - L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, -{ - join_traces_core(arranged1, arranged2, result).as_collection() -} - /// An equijoin of two traces, sharing a common key type. /// /// This method exists to provide join functionality without opinions on the specific input types, keys and values, @@ -347,10 +324,13 @@ pub fn join_traces(arranged1: &Arranged, arranged2: &A /// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. /// /// The implementation produces a caller-specified container, with the requirement that the container -/// can absorb `(D, G::Timestamp, R)` triples. +/// can absorb `(D, G::Timestamp, R)` triples. Implementations can use [`AsCollection`] to wrap the +/// output stream in a collection. /// /// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. -pub fn join_traces_core(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore +/// +/// [`AsCollection`]: crate::collection::AsCollection +pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore where G: Scope, T1: TraceReader+Clone+'static,