Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 32 additions & 47 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,33 +167,49 @@ pub struct Variable<G: Scope, D: Data, R: Abelian>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, (D, G::Timestamp, R)>,
source: Collection<G, D, R>,
source: Option<Collection<G, D, R>>,
step: <G::Timestamp as Timestamp>::Summary,
}

impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattice {
/// Creates a new initially empty `Variable`.
///
/// This method produces a simpler dataflow graph than `new_from`, and should
/// be used whenever the variable has an empty input.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
use collection::AsCollection;
let empty = ::timely::dataflow::operators::generic::operator::empty(scope).as_collection();
Self::new_from(empty, step)
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::new(updates);
Variable { collection, feedback, source: None, step }
}

/// Creates a new `Variable` from a supplied `source` stream.
pub fn new_from(source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = source.inner.scope().feedback(step.clone());
let collection = Collection::new(updates).concat(&source);
Variable { collection, feedback, source, step }
Variable { collection, feedback, source: Some(source), step }
}

/// Adds a new source of data to the `Variable`.
/// Set the definition of the `Variable` to a collection.
///
/// This method binds the `Variable` to be equal to the supplied collection,
/// which may be recursively defined in terms of the variable itself.
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
let in_result = self.source.negate().concat(result);
let mut in_result = result.clone();
if let Some(source) = &self.source {
in_result = in_result.concat(&source.negate());
}
self.set_concat(&in_result)
}


/// Concat a new source of data to the data already in the `Variable`.
/// Set the definition of the `Variable` to a collection concatenated to `self`.
///
/// This method is a specialization of `set` which has the effect of concatenating
/// `result` and `self` before calling `set`. This method avoids some dataflow
/// complexity related to retracting the initial input, and will do less work in
/// that case.
///
/// This behavior can also be achieved by using `new` to create an empty initial
/// collection, and then using `self.set(self.concat(result))`.
pub fn set_concat(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
let step = self.step;
result
Expand All @@ -212,43 +228,12 @@ impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timesta
}
}

/// A recursively defined collection.
///
/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
/// collections evolve simultaneously.
///
/// # Examples
///
/// The following example is equivalent to the example for the `Iterate` trait.
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::order::Product;
/// use timely::dataflow::Scope;
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::iterate::SemigroupVariable;
/// use differential_dataflow::operators::Consolidate;
/// A recursively defined collection that only "grows".
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let numbers = scope.new_collection_from(1 .. 10u32).1;
///
/// scope.iterative::<u64,_,_>(|nested| {
/// let summary = Product::new(Default::default(), 1);
/// let variable = SemigroupVariable::<_,usize,isize>::new(nested, summary);
/// let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
/// .consolidate();
/// variable.set(&result)
/// .leave()
/// });
/// })
/// }
/// ```
/// `SemigroupVariable` is a weakening of `Variable` to allow difference types
/// that do not implement `Abelian` and only implement `Semigroup`. This means
/// that it can be used in settings where the difference type does not support
/// negation.
pub struct SemigroupVariable<G: Scope, D: Data, R: Semigroup>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
Expand All @@ -257,14 +242,14 @@ where G::Timestamp: Lattice {
}

impl<G: Scope, D: Data, R: Semigroup> SemigroupVariable<G, D, R> where G::Timestamp: Lattice {
/// Creates a new initially empty `Variable`.
/// Creates a new initially empty `SemigroupVariable`.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::new(updates);
SemigroupVariable { collection, feedback, step }
}

/// Adds a new source of data to the `Variable`.
/// Adds a new source of data to `self`.
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
let step = self.step;
result
Expand Down