From d32fc037ebd19b71f862a4e1b3e558b26f2b8cc1 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 22 Nov 2020 14:28:54 -0500 Subject: [PATCH] clean up iterate.rs --- src/operators/iterate.rs | 79 ++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 47 deletions(-) diff --git a/src/operators/iterate.rs b/src/operators/iterate.rs index b0d03fc74..062d02255 100644 --- a/src/operators/iterate.rs +++ b/src/operators/iterate.rs @@ -167,33 +167,49 @@ pub struct Variable where G::Timestamp: Lattice { collection: Collection, feedback: Handle, - source: Collection, + source: Option>, step: ::Summary, } impl Variable where G::Timestamp: Lattice { /// Creates a new initially empty `Variable`. + /// + /// This method produces a simpler dataflow graph than `new_from`, and should + /// be used whenever the variable has an empty input. pub fn new(scope: &mut G, step: ::Summary) -> Self { - use collection::AsCollection; - let empty = ::timely::dataflow::operators::generic::operator::empty(scope).as_collection(); - Self::new_from(empty, step) + let (feedback, updates) = scope.feedback(step.clone()); + let collection = Collection::new(updates); + Variable { collection, feedback, source: None, step } } /// Creates a new `Variable` from a supplied `source` stream. pub fn new_from(source: Collection, step: ::Summary) -> Self { let (feedback, updates) = source.inner.scope().feedback(step.clone()); let collection = Collection::new(updates).concat(&source); - Variable { collection, feedback, source, step } + Variable { collection, feedback, source: Some(source), step } } - /// Adds a new source of data to the `Variable`. + /// Set the definition of the `Variable` to a collection. + /// + /// This method binds the `Variable` to be equal to the supplied collection, + /// which may be recursively defined in terms of the variable itself. pub fn set(self, result: &Collection) -> Collection { - let in_result = self.source.negate().concat(result); + let mut in_result = result.clone(); + if let Some(source) = &self.source { + in_result = in_result.concat(&source.negate()); + } self.set_concat(&in_result) } - - /// Concat a new source of data to the data already in the `Variable`. + /// Set the definition of the `Variable` to a collection concatenated to `self`. + /// + /// This method is a specialization of `set` which has the effect of concatenating + /// `result` and `self` before calling `set`. This method avoids some dataflow + /// complexity related to retracting the initial input, and will do less work in + /// that case. + /// + /// This behavior can also be achieved by using `new` to create an empty initial + /// collection, and then using `self.set(self.concat(result))`. pub fn set_concat(self, result: &Collection) -> Collection { let step = self.step; result @@ -212,43 +228,12 @@ impl Deref for Variable where G::Timesta } } -/// A recursively defined collection. -/// -/// The `Variable` struct allows differential dataflow programs requiring more sophisticated -/// iterative patterns than singly recursive iteration. For example: in mutual recursion two -/// collections evolve simultaneously. -/// -/// # Examples -/// -/// The following example is equivalent to the example for the `Iterate` trait. -/// -/// ``` -/// extern crate timely; -/// extern crate differential_dataflow; -/// -/// use timely::order::Product; -/// use timely::dataflow::Scope; -/// -/// use differential_dataflow::input::Input; -/// use differential_dataflow::operators::iterate::SemigroupVariable; -/// use differential_dataflow::operators::Consolidate; +/// A recursively defined collection that only "grows". /// -/// fn main() { -/// ::timely::example(|scope| { -/// -/// let numbers = scope.new_collection_from(1 .. 10u32).1; -/// -/// scope.iterative::(|nested| { -/// let summary = Product::new(Default::default(), 1); -/// let variable = SemigroupVariable::<_,usize,isize>::new(nested, summary); -/// let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x }) -/// .consolidate(); -/// variable.set(&result) -/// .leave() -/// }); -/// }) -/// } -/// ``` +/// `SemigroupVariable` is a weakening of `Variable` to allow difference types +/// that do not implement `Abelian` and only implement `Semigroup`. This means +/// that it can be used in settings where the difference type does not support +/// negation. pub struct SemigroupVariable where G::Timestamp: Lattice { collection: Collection, @@ -257,14 +242,14 @@ where G::Timestamp: Lattice { } impl SemigroupVariable where G::Timestamp: Lattice { - /// Creates a new initially empty `Variable`. + /// Creates a new initially empty `SemigroupVariable`. pub fn new(scope: &mut G, step: ::Summary) -> Self { let (feedback, updates) = scope.feedback(step.clone()); let collection = Collection::new(updates); SemigroupVariable { collection, feedback, step } } - /// Adds a new source of data to the `Variable`. + /// Adds a new source of data to `self`. pub fn set(self, result: &Collection) -> Collection { let step = self.step; result