Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ where
{
use crate::operators::join::join_traces;
join_traces(self, other, result)
.as_collection()
}
}

Expand Down
24 changes: 17 additions & 7 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
use std::cmp::Ordering;

use timely::container::{PushContainer, PushInto};
use timely::order::PartialOrder;
use timely::progress::Timestamp;
use timely::dataflow::Scope;
use timely::dataflow::operators::generic::{Operator, OutputHandle};
use timely::dataflow::{Scope, StreamCore};
use timely::dataflow::operators::generic::{Operator, OutputHandleCore};
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Capability;
use timely::dataflow::channels::pushers::tee::Tee;

use crate::hashable::Hashable;
use crate::{Data, ExchangeData, Collection, AsCollection};
use crate::{Data, ExchangeData, Collection};
use crate::difference::{Semigroup, Abelian, Multiply};
use crate::lattice::Lattice;
use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf};
Expand Down Expand Up @@ -322,8 +323,14 @@ where
/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
///
/// The implementation produces a caller-specified container, with the requirement that the container
/// can absorb `(D, G::Timestamp, R)` triples. Implementations can use [`AsCollection`] to wrap the
/// output stream in a collection.
///
/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
pub fn join_traces<G, T1, T2, I,L,D,R>(arranged1: &Arranged<G,T1>, arranged2: &Arranged<G,T2>, mut result: L) -> Collection<G, D, R>
///
/// [`AsCollection`]: crate::collection::AsCollection
pub fn join_traces<G, T1, T2, I,L,D,R,C>(arranged1: &Arranged<G,T1>, arranged2: &Arranged<G,T2>, mut result: L) -> StreamCore<G, C>
where
G: Scope<Timestamp=T1::Time>,
T1: TraceReader+Clone+'static,
Expand All @@ -332,6 +339,8 @@ where
R: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, R)>,
L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
C: PushContainer,
(D, G::Timestamp, R): PushInto<C>,
{
// Rename traces for symmetry from here on out.
let mut trace1 = arranged1.trace.clone();
Expand Down Expand Up @@ -565,7 +574,6 @@ where
}
}
})
.as_collection()
}


Expand Down Expand Up @@ -617,10 +625,12 @@ where

/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
#[inline(never)]
fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R), Tee<T, Vec<(D, T, R)>>>, mut logic: L, fuel: &mut usize)
where
fn work<L, I, C>(&mut self, output: &mut OutputHandleCore<T, C, Tee<T, C>>, mut logic: L, fuel: &mut usize)
where
I: IntoIterator<Item=(D, T, R)>,
L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I,
C: PushContainer,
(D, T, R): PushInto<C>,
{

let meet = self.capability.time();
Expand Down