diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index bffc2bdaa..9a5d926c0 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -427,6 +427,7 @@ where { use crate::operators::join::join_traces; join_traces(self, other, result) + .as_collection() } } diff --git a/src/operators/join.rs b/src/operators/join.rs index a31e682a5..311f06802 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -5,16 +5,17 @@ //! + (b * c), and if this is not equal to the former term, little is known about the actual output. use std::cmp::Ordering; +use timely::container::{PushContainer, PushInto}; use timely::order::PartialOrder; use timely::progress::Timestamp; -use timely::dataflow::Scope; -use timely::dataflow::operators::generic::{Operator, OutputHandle}; +use timely::dataflow::{Scope, StreamCore}; +use timely::dataflow::operators::generic::{Operator, OutputHandleCore}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; use timely::dataflow::channels::pushers::tee::Tee; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection, AsCollection}; +use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf}; @@ -322,8 +323,14 @@ where /// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic /// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. /// +/// The implementation produces a caller-specified container, with the requirement that the container +/// can absorb `(D, G::Timestamp, R)` triples. Implementations can use [`AsCollection`] to wrap the +/// output stream in a collection. +/// /// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. -pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> Collection +/// +/// [`AsCollection`]: crate::collection::AsCollection +pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> StreamCore where G: Scope, T1: TraceReader+Clone+'static, @@ -332,6 +339,8 @@ where R: Semigroup, I: IntoIterator, L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, + C: PushContainer, + (D, G::Timestamp, R): PushInto, { // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); @@ -565,7 +574,6 @@ where } } }) - .as_collection() } @@ -617,10 +625,12 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>>, mut logic: L, fuel: &mut usize) - where + fn work(&mut self, output: &mut OutputHandleCore>, mut logic: L, fuel: &mut usize) + where I: IntoIterator, L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, + C: PushContainer, + (D, T, R): PushInto, { let meet = self.capability.time();