Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 29 additions & 30 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,35 @@ impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
pub fn scope(&self) -> G {
self.inner.scope()
}

/// Creates a new collection whose counts are the negation of those in the input.
///
/// This method is most commonly used with `concat` to get those element in one collection but not another.
/// However, differential dataflow computations are still defined for all values of the difference type `R`,
/// including negative counts.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.negate()
/// .concat(&data)
/// .assert_eq(&evens);
/// });
/// ```
// TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
// an inherent method on `Collection`.
pub fn negate(&self) -> Collection<G, D, R, C> where StreamCore<G, C>: crate::operators::Negate<G, C> {
crate::operators::Negate::negate(&self.inner).as_collection()
}
}

impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
Expand Down Expand Up @@ -552,36 +581,6 @@ impl<'a, G: Scope, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, G

/// Methods requiring an Abelian difference, to support negation.
impl<G: Scope, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection whose counts are the negation of those in the input.
///
/// This method is most commonly used with `concat` to get those element in one collection but not another.
/// However, differential dataflow computations are still defined for all values of the difference type `R`,
/// including negative counts.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.negate()
/// .concat(&data)
/// .assert_eq(&evens);
/// });
/// ```
pub fn negate(&self) -> Collection<G, D, R> {
self.inner
.map_in_place(|x| x.2.negate())
.as_collection()
}


/// Assert if the collections are ever different.
///
/// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
Expand Down
119 changes: 92 additions & 27 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@
use std::fmt::Debug;
use std::ops::Deref;

use timely::Container;
use timely::progress::{Timestamp, PathSummary};
use timely::order::Product;

use timely::dataflow::*;
use timely::dataflow::scopes::child::Iterative;
use timely::dataflow::operators::{Feedback, ConnectLoop, Map};
use timely::dataflow::operators::{Feedback, ConnectLoop};
use timely::dataflow::operators::feedback::Handle;

use crate::{Data, Collection};
use crate::{Data, Collection, AsCollection};
use crate::difference::{Semigroup, Abelian};
use crate::lattice::Lattice;

Expand Down Expand Up @@ -151,37 +152,47 @@ impl<G: Scope, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<G, D, R> for G {
/// });
/// })
/// ```
pub struct Variable<G: Scope, D: Data, R: Abelian+'static>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
source: Option<Collection<G, D, R>>,
pub struct Variable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
where
G: Scope,
G::Timestamp: Lattice,
D: Data,
R: Abelian + 'static,
C: Container + Clone + 'static,
{
collection: Collection<G, D, R, C>,
feedback: Handle<G, C>,
source: Option<Collection<G, D, R, C>>,
step: <G::Timestamp as Timestamp>::Summary,
}

impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattice {
impl<G: Scope, D: Data, R: Abelian, C: Container + Clone + 'static> Variable<G, D, R, C>
where
G::Timestamp: Lattice,
StreamCore<G, C>: crate::operators::Negate<G, C> + ResultsIn<G, C>,
{
/// Creates a new initially empty `Variable`.
///
/// This method produces a simpler dataflow graph than `new_from`, and should
/// be used whenever the variable has an empty input.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::<G,D,R>::new(updates);
Variable { collection, feedback, source: None, step }
let collection = Collection::<G, D, R, C>::new(updates);
Self { collection, feedback, source: None, step }
}

/// Creates a new `Variable` from a supplied `source` stream.
pub fn new_from(source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
pub fn new_from(source: Collection<G, D, R, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = source.inner.scope().feedback(step.clone());
let collection = Collection::<G,D,R>::new(updates).concat(&source);
let collection = Collection::<G, D, R, C>::new(updates).concat(&source);
Variable { collection, feedback, source: Some(source), step }
}

/// Set the definition of the `Variable` to a collection.
///
/// This method binds the `Variable` to be equal to the supplied collection,
/// which may be recursively defined in terms of the variable itself.
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
let mut in_result = result.clone();
if let Some(source) = &self.source {
in_result = in_result.concat(&source.negate());
Expand All @@ -198,19 +209,19 @@ impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattic
///
/// This behavior can also be achieved by using `new` to create an empty initial
/// collection, and then using `self.set(self.concat(result))`.
pub fn set_concat(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
pub fn set_concat(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
let step = self.step;
result
.inner
.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
.results_in(step)
.connect_loop(self.feedback);

self.collection
}
}

impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timestamp: Lattice {
type Target = Collection<G, D, R>;
impl<G: Scope, D: Data, R: Abelian, C: Container + Clone + 'static> Deref for Variable<G, D, R, C> where G::Timestamp: Lattice {
type Target = Collection<G, D, R, C>;
fn deref(&self) -> &Self::Target {
&self.collection
}
Expand All @@ -222,36 +233,90 @@ impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timesta
/// that do not implement `Abelian` and only implement `Semigroup`. This means
/// that it can be used in settings where the difference type does not support
/// negation.
pub struct SemigroupVariable<G: Scope, D: Data, R: Semigroup+'static>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
pub struct SemigroupVariable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
where
G::Timestamp: Lattice,
G: Scope,
D: Data,
R: Semigroup + 'static,
C: Container + Clone + 'static,
{
collection: Collection<G, D, R, C>,
feedback: Handle<G, C>,
step: <G::Timestamp as Timestamp>::Summary,
}

impl<G: Scope, D: Data, R: Semigroup> SemigroupVariable<G, D, R> where G::Timestamp: Lattice {
impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone> SemigroupVariable<G, D, R, C>
where
G::Timestamp: Lattice,
StreamCore<G, C>: ResultsIn<G, C>,
{
/// Creates a new initially empty `SemigroupVariable`.
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::<G,D,R>::new(updates);
let collection = Collection::<G,D,R,C>::new(updates);
SemigroupVariable { collection, feedback, step }
}

/// Adds a new source of data to `self`.
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
let step = self.step;
result
.inner
.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
.results_in(step)
.connect_loop(self.feedback);

self.collection
}
}

impl<G: Scope, D: Data, R: Semigroup> Deref for SemigroupVariable<G, D, R> where G::Timestamp: Lattice {
type Target = Collection<G, D, R>;
impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone+'static> Deref for SemigroupVariable<G, D, R, C> where G::Timestamp: Lattice {
type Target = Collection<G, D, R, C>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}

/// Extension trait for streams.
pub trait ResultsIn<G: Scope, C> {
/// Advances a timestamp in the stream according to the timestamp actions on the path.
///
/// The path may advance the timestamp sufficiently that it is no longer valid, for example if
/// incrementing fields would result in integer overflow. In this case, the record is dropped.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::ResultsIn;
///
/// timely::example(|scope| {
/// let summary1 = 5;
///
/// let data = scope.new_collection_from(1 .. 10).1;
/// /// Applies `results_in` on every timestamp in the collection.
/// data.results_in(summary1);
/// });
/// ```
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self;
}

impl<G, D, R, C> ResultsIn<G, C> for Collection<G, D, R, C>
where
G: Scope,
C: Clone,
StreamCore<G, C>: ResultsIn<G, C>,
{
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
self.inner.results_in(step).as_collection()
}
}

impl<G: Scope, D: timely::Data, R: timely::Data> ResultsIn<G, Vec<(D, G::Timestamp, R)>> for Stream<G, (D, G::Timestamp, R)> {
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
use timely::dataflow::operators::Map;
self.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
}
}
4 changes: 3 additions & 1 deletion src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
//! operators have specialized implementations to make them work efficiently, and are in addition
//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).

pub use self::negate::Negate;
pub use self::reduce::{Reduce, Threshold, Count};
pub use self::iterate::Iterate;
pub use self::iterate::{Iterate, ResultsIn};
pub use self::join::{Join, JoinCore};
pub use self::count::CountTotal;
pub use self::threshold::ThresholdTotal;

pub mod arrange;
pub mod negate;
pub mod reduce;
pub mod consolidate;
pub mod iterate;
Expand Down
54 changes: 54 additions & 0 deletions src/operators/negate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Negate the diffs of collections and streams.

use timely::Data;
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::dataflow::operators::Map;

use crate::{AsCollection, Collection};
use crate::difference::Abelian;

/// Negate the contents of a stream.
pub trait Negate<G, C> {
/// Creates a new collection whose counts are the negation of those in the input.
///
/// This method is most commonly used with `concat` to get those element in one collection but not another.
/// However, differential dataflow computations are still defined for all values of the difference type `R`,
/// including negative counts.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.negate()
/// .concat(&data)
/// .assert_eq(&evens);
/// });
/// ```
fn negate(&self) -> Self;
}

impl<G, D, R, C> Negate<G, C> for Collection<G, D, R, C>
where
G: Scope,
C: Clone,
StreamCore<G, C>: Negate<G, C>,
{
fn negate(&self) -> Self {
self.inner.negate().as_collection()
}
}

impl<G: Scope, D: Data, T: Data, R: Data + Abelian> Negate<G, Vec<(D, T, R)>> for Stream<G, (D, T, R)> {
fn negate(&self) -> Self {
self.map_in_place(|x| x.2.negate())
}
}