diff --git a/Cargo.toml b/Cargo.toml index a245cce3f..a30cb5090 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,8 +28,8 @@ serde_derive = "1.0" abomonation = "0.7" abomonation_derive = "0.3" timely_sort="0.1.6" -timely = "0.9" -#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +#timely = "0.9" +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } #timely = { path = "../timely-dataflow/" } fnv="1.0.2" diff --git a/src/collection.rs b/src/collection.rs index d1719bc4a..8a45bb73f 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -199,6 +199,41 @@ impl Collection where G::Timestamp: Data .concat(&other.inner) .as_collection() } + /// Creates a new collection accumulating the contents of the two collections. + /// + /// Despite the name, differential dataflow collections are unordered. This method is so named because the + /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the + /// two collections. + /// + /// # Examples + /// + /// ``` + /// extern crate timely; + /// extern crate differential_dataflow; + /// + /// use differential_dataflow::input::Input; + /// + /// fn main() { + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let odds = data.filter(|x| x % 2 == 1); + /// let evens = data.filter(|x| x % 2 == 0); + /// + /// odds.concatenate(Some(evens)) + /// .assert_eq(&data); + /// }); + /// } + /// ``` + pub fn concatenate(&self, sources: I) -> Collection + where + I: IntoIterator> + { + self.inner + .concatenate(sources.into_iter().map(|x| x.inner)) + .as_collection() + } /// Replaces each record with another, with a new difference type. /// /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) @@ -580,4 +615,42 @@ impl AsCollection for Stream Collection { Collection::new(self.clone()) } +} + +/// Concatenates multiple collections. +/// +/// This method has the effect of a sequence of calls to `concat`, but it does +/// so in one operator rather than a chain of many operators. +/// +/// # Examples +/// +/// ``` +/// extern crate timely; +/// extern crate differential_dataflow; +/// +/// use differential_dataflow::input::Input; +/// +/// fn main() { +/// ::timely::example(|scope| { +/// +/// let data = scope.new_collection_from(1 .. 10).1; +/// +/// let odds = data.filter(|x| x % 2 == 1); +/// let evens = data.filter(|x| x % 2 == 0); +/// +/// differential_dataflow::collection::concatenate(scope, vec![odds, evens]) +/// .assert_eq(&data); +/// }); +/// } +/// ``` +pub fn concatenate(scope: &mut G, iterator: I) -> Collection +where + G: Scope, + D: Data, + R: Monoid, + I: IntoIterator>, +{ + scope + .concatenate(iterator.into_iter().map(|x| x.inner)) + .as_collection() } \ No newline at end of file