Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use timely::progress::Timestamp;
use timely::dataflow::operators::Partition;
use timely::dataflow::operators::Concatenate;

use differential_dataflow::{ExchangeData, Collection, AsCollection};
use differential_dataflow::{Data, ExchangeData, Collection, AsCollection};
use differential_dataflow::operators::Threshold;
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
Expand All @@ -33,9 +33,9 @@ pub mod operators;
**/
pub trait PrefixExtender<G: Scope, R: Monoid+Multiply<Output = R>> {
/// The required type of prefix to extend.
type Prefix;
type Prefix: Data;
/// The type to be produced as extension.
type Extension;
type Extension: Data;
/// Annotates prefixes with the number of extensions the relation would propose.
fn count(&mut self, prefixes: &Collection<G, (Self::Prefix, usize, usize), R>, index: usize) -> Collection<G, (Self::Prefix, usize, usize), R>;
/// Extends each prefix with corresponding extensions.
Expand Down Expand Up @@ -92,11 +92,11 @@ where
}
}

pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Multiply<Output = R>, P, E> {
pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Multiply<Output = R>, P: Data, E: Data> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R>;
}

impl<G: Scope, R: Monoid+Multiply<Output = R>, P, E> ValidateExtensionMethod<G, R, P, E> for Collection<G, (P, E), R> {
impl<G: Scope, R: Monoid+Multiply<Output = R>, P: Data, E: Data> ValidateExtensionMethod<G, R, P, E> for Collection<G, (P, E), R> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R> {
extender.validate(self)
}
Expand Down
6 changes: 3 additions & 3 deletions src/algorithms/prefix_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use timely::dataflow::Scope;

use ::{Collection, ExchangeData};
use ::{Collection, Data, ExchangeData};
use ::lattice::Lattice;
use ::operators::*;

/// Extension trait for the prefix_sum method.
pub trait PrefixSum<G: Scope, K, D> {
pub trait PrefixSum<G: Scope, K: Data, D> {
/// Computes the prefix sum for each element in the collection.
///
/// The prefix sum is data-parallel, in the sense that the sums are computed independently for
Expand Down Expand Up @@ -150,4 +150,4 @@ where
.distinct()
})
.semijoin(&queries)
}
}
242 changes: 136 additions & 106 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@
//! implementations, and to support efficient incremental updates to the collections.

use std::hash::Hash;
use std::marker::PhantomData;

use timely::Data;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::scopes::{Child, child::Iterative};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::{Scope, StreamCore};
use timely::dataflow::operators::*;

use ::difference::{Semigroup, Abelian, Multiply};
use lattice::Lattice;
use hashable::Hashable;
use TimelyContainer;

/// A mutable collection of values of type `D`
/// A mutable collection of values of type `D` within a container `C`
///
/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
/// differential dataflow computation, you write as if the collection is a static dataset to which you
Expand All @@ -30,31 +32,141 @@ use hashable::Hashable;
/// propagate changes through your functional computation and report the corresponding changes to the
/// output collections.
///
/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
/// Each collection has four generic parameters. The parameter `G` is for the scope in which the
/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
/// defaults to) `isize`, representing changes to the occurrence count of each record.
/// defaults to) `isize`, representing changes to the occurrence count of each record. The `C`
/// parameter specifies the container type of the collection.
///
/// Note that the default container type is `Vec<_>`.
#[derive(Clone)]
pub struct Collection<G: Scope, D, R: Semigroup = isize> {
pub struct Collection<G, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
where
G: Scope,
R: Semigroup,
{
/// The underlying timely dataflow stream.
///
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
/// not intended to be the idiomatic way to work with the collection.
pub inner: Stream<G, (D, G::Timestamp, R)>
pub inner: StreamCore<G, C>,
/// Phantom data to consume type parameters.
pub _phantom: PhantomData<(*const D, *const R)>,
}

impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
impl<G: Scope, C, D, R: Semigroup> Collection<G, D, R, C>
where
C: TimelyContainer<Item=(D, G::Timestamp, R)>,
{
/// Creates a new Collection from a timely dataflow stream.
///
/// This method seems to be rarely used, with the `as_collection` method on streams being a more
/// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
/// provides a `new_collection` method which will create a new collection for you without exposing
/// the underlying timely stream at all.
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
Collection { inner: stream }
pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
Self { inner: stream, _phantom: PhantomData }
}

/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concat(&self, other: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
self.inner
.concat(&other.inner)
.as_collection()
}

/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R, C>
where
I: IntoIterator<Item=Collection<G, D, R, C>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}

/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}

/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R, C> {
self.inner
.probe_with(handle)
.as_collection()
}

/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection by applying the supplied function to each input element.
///
/// # Examples
Expand Down Expand Up @@ -166,73 +278,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
.filter(move |&(ref data, _, _)| logic(data))
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
where
I: IntoIterator<Item=Collection<G, D, R>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
/// Replaces each record with another, with a new difference type.
///
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
Expand Down Expand Up @@ -482,25 +527,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
.inspect_batch(func)
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
self.inner
.probe_with(handle)
.as_collection()
}

/// Assert if the collection is ever non-empty.
///
Expand Down Expand Up @@ -534,11 +560,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}

/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

use timely::dataflow::scopes::ScopeParent;
Expand Down Expand Up @@ -673,13 +694,21 @@ impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data
}

/// Conversion to a differential dataflow Collection.
pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
pub trait AsCollection<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
where
G: Scope,
R: Semigroup,
C: TimelyContainer<Item=(D, G::Timestamp, R)>,
{
/// Converts the type to a differential dataflow collection.
fn as_collection(&self) -> Collection<G, D, R>;
fn as_collection(&self) -> Collection<G, D, R, C>;
}

impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
fn as_collection(&self) -> Collection<G, D, R> {
impl<G: Scope, D, R: Semigroup, C> AsCollection<G, D, R, C> for StreamCore<G, C>
where
C: TimelyContainer<Item=(D, G::Timestamp, R)>
{
fn as_collection(&self) -> Collection<G, D, R, C> {
Collection::new(self.clone())
}
}
Expand Down Expand Up @@ -710,12 +739,13 @@ impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G:
/// });
/// }
/// ```
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
pub fn concatenate<G, D, R, I, C>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
where
G: Scope,
D: Data,
R: Semigroup,
I: IntoIterator<Item=Collection<G, D, R>>,
I: IntoIterator<Item=Collection<G, D, R, C>>,
C: TimelyContainer<Item=(D, G::Timestamp, R)>,
{
scope
.concatenate(iterator.into_iter().map(|x| x.inner))
Expand Down
Loading