diff --git a/src/collection.rs b/src/collection.rs index 392f9001c..6fa1f880e 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -172,6 +172,35 @@ impl Collection { pub fn scope(&self) -> G { self.inner.scope() } + + /// Creates a new collection whose counts are the negation of those in the input. + /// + /// This method is most commonly used with `concat` to get those element in one collection but not another. + /// However, differential dataflow computations are still defined for all values of the difference type `R`, + /// including negative counts. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.negate() + /// .concat(&data) + /// .assert_eq(&evens); + /// }); + /// ``` + // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect + // an inherent method on `Collection`. + pub fn negate(&self) -> Collection where StreamCore: crate::operators::Negate { + crate::operators::Negate::negate(&self.inner).as_collection() + } } impl Collection { @@ -552,36 +581,6 @@ impl<'a, G: Scope, D: Clone+'static, R: Clone+'static> Collection Collection where G::Timestamp: Data { - /// Creates a new collection whose counts are the negation of those in the input. - /// - /// This method is most commonly used with `concat` to get those element in one collection but not another. - /// However, differential dataflow computations are still defined for all values of the difference type `R`, - /// including negative counts. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.negate() - /// .concat(&data) - /// .assert_eq(&evens); - /// }); - /// ``` - pub fn negate(&self) -> Collection { - self.inner - .map_in_place(|x| x.2.negate()) - .as_collection() - } - - /// Assert if the collections are ever different. /// /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation diff --git a/src/operators/iterate.rs b/src/operators/iterate.rs index 6e7f70e98..6056ad8cd 100644 --- a/src/operators/iterate.rs +++ b/src/operators/iterate.rs @@ -33,15 +33,16 @@ use std::fmt::Debug; use std::ops::Deref; +use timely::Container; use timely::progress::{Timestamp, PathSummary}; use timely::order::Product; use timely::dataflow::*; use timely::dataflow::scopes::child::Iterative; -use timely::dataflow::operators::{Feedback, ConnectLoop, Map}; +use timely::dataflow::operators::{Feedback, ConnectLoop}; use timely::dataflow::operators::feedback::Handle; -use crate::{Data, Collection}; +use crate::{Data, Collection, AsCollection}; use crate::difference::{Semigroup, Abelian}; use crate::lattice::Lattice; @@ -151,29 +152,39 @@ impl Iterate for G { /// }); /// }) /// ``` -pub struct Variable -where G::Timestamp: Lattice { - collection: Collection, - feedback: Handle>, - source: Option>, +pub struct Variable::Timestamp, R)>> +where + G: Scope, + G::Timestamp: Lattice, + D: Data, + R: Abelian + 'static, + C: Container + Clone + 'static, +{ + collection: Collection, + feedback: Handle, + source: Option>, step: ::Summary, } -impl Variable where G::Timestamp: Lattice { +impl Variable +where + G::Timestamp: Lattice, + StreamCore: crate::operators::Negate + ResultsIn, +{ /// Creates a new initially empty `Variable`. /// /// This method produces a simpler dataflow graph than `new_from`, and should /// be used whenever the variable has an empty input. pub fn new(scope: &mut G, step: ::Summary) -> Self { let (feedback, updates) = scope.feedback(step.clone()); - let collection = Collection::::new(updates); - Variable { collection, feedback, source: None, step } + let collection = Collection::::new(updates); + Self { collection, feedback, source: None, step } } /// Creates a new `Variable` from a supplied `source` stream. - pub fn new_from(source: Collection, step: ::Summary) -> Self { + pub fn new_from(source: Collection, step: ::Summary) -> Self { let (feedback, updates) = source.inner.scope().feedback(step.clone()); - let collection = Collection::::new(updates).concat(&source); + let collection = Collection::::new(updates).concat(&source); Variable { collection, feedback, source: Some(source), step } } @@ -181,7 +192,7 @@ impl Variable where G::Timestamp: Lattic /// /// This method binds the `Variable` to be equal to the supplied collection, /// which may be recursively defined in terms of the variable itself. - pub fn set(self, result: &Collection) -> Collection { + pub fn set(self, result: &Collection) -> Collection { let mut in_result = result.clone(); if let Some(source) = &self.source { in_result = in_result.concat(&source.negate()); @@ -198,19 +209,19 @@ impl Variable where G::Timestamp: Lattic /// /// This behavior can also be achieved by using `new` to create an empty initial /// collection, and then using `self.set(self.concat(result))`. - pub fn set_concat(self, result: &Collection) -> Collection { + pub fn set_concat(self, result: &Collection) -> Collection { let step = self.step; result .inner - .flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d))) + .results_in(step) .connect_loop(self.feedback); self.collection } } -impl Deref for Variable where G::Timestamp: Lattice { - type Target = Collection; +impl Deref for Variable where G::Timestamp: Lattice { + type Target = Collection; fn deref(&self) -> &Self::Target { &self.collection } @@ -222,36 +233,90 @@ impl Deref for Variable where G::Timesta /// that do not implement `Abelian` and only implement `Semigroup`. This means /// that it can be used in settings where the difference type does not support /// negation. -pub struct SemigroupVariable -where G::Timestamp: Lattice { - collection: Collection, - feedback: Handle>, +pub struct SemigroupVariable::Timestamp, R)>> +where + G::Timestamp: Lattice, + G: Scope, + D: Data, + R: Semigroup + 'static, + C: Container + Clone + 'static, +{ + collection: Collection, + feedback: Handle, step: ::Summary, } -impl SemigroupVariable where G::Timestamp: Lattice { +impl SemigroupVariable +where + G::Timestamp: Lattice, + StreamCore: ResultsIn, +{ /// Creates a new initially empty `SemigroupVariable`. pub fn new(scope: &mut G, step: ::Summary) -> Self { let (feedback, updates) = scope.feedback(step.clone()); - let collection = Collection::::new(updates); + let collection = Collection::::new(updates); SemigroupVariable { collection, feedback, step } } /// Adds a new source of data to `self`. - pub fn set(self, result: &Collection) -> Collection { + pub fn set(self, result: &Collection) -> Collection { let step = self.step; result .inner - .flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d))) + .results_in(step) .connect_loop(self.feedback); self.collection } } -impl Deref for SemigroupVariable where G::Timestamp: Lattice { - type Target = Collection; +impl Deref for SemigroupVariable where G::Timestamp: Lattice { + type Target = Collection; fn deref(&self) -> &Self::Target { &self.collection } } + +/// Extension trait for streams. +pub trait ResultsIn { + /// Advances a timestamp in the stream according to the timestamp actions on the path. + /// + /// The path may advance the timestamp sufficiently that it is no longer valid, for example if + /// incrementing fields would result in integer overflow. In this case, the record is dropped. + /// + /// # Examples + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen}; + /// + /// use differential_dataflow::input::Input; + /// use differential_dataflow::operators::ResultsIn; + /// + /// timely::example(|scope| { + /// let summary1 = 5; + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// /// Applies `results_in` on every timestamp in the collection. + /// data.results_in(summary1); + /// }); + /// ``` + fn results_in(&self, step: ::Summary) -> Self; +} + +impl ResultsIn for Collection +where + G: Scope, + C: Clone, + StreamCore: ResultsIn, +{ + fn results_in(&self, step: ::Summary) -> Self { + self.inner.results_in(step).as_collection() + } +} + +impl ResultsIn> for Stream { + fn results_in(&self, step: ::Summary) -> Self { + use timely::dataflow::operators::Map; + self.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d))) + } +} diff --git a/src/operators/mod.rs b/src/operators/mod.rs index fdde1f242..f069df475 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -4,13 +4,15 @@ //! operators have specialized implementations to make them work efficiently, and are in addition //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). +pub use self::negate::Negate; pub use self::reduce::{Reduce, Threshold, Count}; -pub use self::iterate::Iterate; +pub use self::iterate::{Iterate, ResultsIn}; pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; pub use self::threshold::ThresholdTotal; pub mod arrange; +pub mod negate; pub mod reduce; pub mod consolidate; pub mod iterate; diff --git a/src/operators/negate.rs b/src/operators/negate.rs new file mode 100644 index 000000000..b354c95ca --- /dev/null +++ b/src/operators/negate.rs @@ -0,0 +1,54 @@ +//! Negate the diffs of collections and streams. + +use timely::Data; +use timely::dataflow::{Scope, Stream, StreamCore}; +use timely::dataflow::operators::Map; + +use crate::{AsCollection, Collection}; +use crate::difference::Abelian; + +/// Negate the contents of a stream. +pub trait Negate { + /// Creates a new collection whose counts are the negation of those in the input. + /// + /// This method is most commonly used with `concat` to get those element in one collection but not another. + /// However, differential dataflow computations are still defined for all values of the difference type `R`, + /// including negative counts. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.negate() + /// .concat(&data) + /// .assert_eq(&evens); + /// }); + /// ``` + fn negate(&self) -> Self; +} + +impl Negate for Collection +where + G: Scope, + C: Clone, + StreamCore: Negate, +{ + fn negate(&self) -> Self { + self.inner.negate().as_collection() + } +} + +impl Negate> for Stream { + fn negate(&self) -> Self { + self.map_in_place(|x| x.2.negate()) + } +} +