diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 653ed1e87..7995b4397 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -489,90 +489,70 @@ where } } -/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`. -/// -/// This trait is primarily implemented by `Collection`. -/// -/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained. -/// This allows e.g. for `Vec` inputs to present as `&[u8]` when read, but that relationship is not -/// constrained by this trait. -pub trait Arrange +/// A type that can be arranged as if a collection of updates. +pub trait Arrange where G: Scope, G::Timestamp: Lattice, { - /// Arranges a stream of `(Key, Val)` updates by `Key`. - /// - /// This operator arranges a stream of values into a shared trace, whose contents it maintains. + /// Arranges updates into a shared trace. fn arrange(&self) -> Arranged> where Tr: Trace + 'static, - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, - Tr::Builder: Builder, + Tr::Batcher: Batcher, { self.arrange_named("Arrange") } - /// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument. - /// - /// This operator arranges a stream of values into a shared trace, whose contents it maintains. + /// Arranges updates into a shared trace, with a supplied name. fn arrange_named(&self, name: &str) -> Arranged> where Tr: Trace + 'static, - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, - Tr::Builder: Builder, - { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } + Tr::Batcher: Batcher, + ; - /// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract. - /// - /// This operator arranges a stream of values into a shared trace, whose contents it maintains. - /// It uses the supplied parallelization contract to distribute the data, which does not need to - /// be consistently by key (though this is the most common). + /// Arranges updates into a shared trace, using a supplied parallelization contract, with a supplied name. fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract>, - K: Clone, - V: Clone, - R: Clone, + P: ParallelizationContract, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, - Tr::Builder: Builder, + Tr::Batcher: Batcher, ; } -impl Arrange for Collection +impl Arrange> for Collection where G: Scope, G::Timestamp: Lattice, - K: Clone + 'static, - V: Clone + 'static, - R: Semigroup, + K: ExchangeData + Hashable, + V: ExchangeData, + R: ExchangeData + Semigroup, { + fn arrange_named(&self, name: &str) -> Arranged> + where + Tr: Trace + 'static, + Tr::Batch: Batch, + Tr::Batcher: Batcher>, + { + let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + self.arrange_core(exchange, name) + } + fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract>, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, - Tr::Builder: Builder, + Tr::Batcher: Batcher>, { arrange_core(&self.inner, pact, name) } } -/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. /// /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to @@ -584,9 +564,7 @@ where P: ParallelizationContract::Input>, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher