diff --git a/examples/arrange.rs b/examples/arrange.rs index d7905907e..8fbda1cba 100644 --- a/examples/arrange.rs +++ b/examples/arrange.rs @@ -36,7 +36,7 @@ fn main() { let mut probe = timely::dataflow::operators::probe::Handle::new(); // create a dataflow managing an ever-changing edge collection. - let mut graph = worker.dataflow::,_,_>(|scope| { + let mut graph = worker.dataflow::,_,_>(|scope| { // create a source operator which will produce random edges and delete them. timely::dataflow::operators::generic::source(scope, "RandomGraph", |mut capability, info| { diff --git a/examples/compact.rs b/examples/compact.rs index 665af2d42..dc90d88ae 100644 --- a/examples/compact.rs +++ b/examples/compact.rs @@ -19,7 +19,7 @@ fn main() { let mut probe = timely::dataflow::operators::probe::Handle::new(); // create a dataflow managing an ever-changing edge collection. - let mut handle = worker.dataflow(|scope| { + let mut handle = worker.dataflow(|scope| { let (handle, input) = scope.new_collection(); input.distinct().probe_with(&mut probe); handle diff --git a/src/difference.rs b/src/difference.rs index c3010f461..16c6b5fb8 100644 --- a/src/difference.rs +++ b/src/difference.rs @@ -25,70 +25,70 @@ pub use self::Abelian as Diff; /// in order of timestamps, for many types of timestamps there is no total order and consequently no /// obvious order to respect. Non-commutative semigroups should be used with care. pub trait Semigroup : for<'a> AddAssign<&'a Self> + ::std::marker::Sized + Data + Clone { - /// Returns true if the element is the additive identity. - /// - /// This is primarily used by differential dataflow to know when it is safe to delete an update. - /// When a difference accumulates to zero, the difference has no effect on any accumulation and can - /// be removed. - /// - /// A semigroup is not obligated to have a zero element, and this method could always return - /// false in such a setting. - fn is_zero(&self) -> bool; + /// Returns true if the element is the additive identity. + /// + /// This is primarily used by differential dataflow to know when it is safe to delete an update. + /// When a difference accumulates to zero, the difference has no effect on any accumulation and can + /// be removed. + /// + /// A semigroup is not obligated to have a zero element, and this method could always return + /// false in such a setting. + fn is_zero(&self) -> bool; } impl Semigroup for isize { - #[inline] fn is_zero(&self) -> bool { self == &0 } + #[inline] fn is_zero(&self) -> bool { self == &0 } } impl Semigroup for i128 { - #[inline] fn is_zero(&self) -> bool { self == &0 } + #[inline] fn is_zero(&self) -> bool { self == &0 } } impl Semigroup for i64 { - #[inline] fn is_zero(&self) -> bool { self == &0 } + #[inline] fn is_zero(&self) -> bool { self == &0 } } impl Semigroup for i32 { - #[inline] fn is_zero(&self) -> bool { self == &0 } + #[inline] fn is_zero(&self) -> bool { self == &0 } } impl Semigroup for i16 { - #[inline] fn is_zero(&self) -> bool { self == &0 } + #[inline] fn is_zero(&self) -> bool { self == &0 } } impl Semigroup for i8 { - #[inline] fn is_zero(&self) -> bool { self == &0 } + #[inline] fn is_zero(&self) -> bool { self == &0 } } /// A semigroup with an explicit zero element. pub trait Monoid : Semigroup { - /// A zero element under the semigroup addition operator. - fn zero() -> Self; + /// A zero element under the semigroup addition operator. + fn zero() -> Self; } impl Monoid for isize { - #[inline] fn zero() -> Self { 0 } + #[inline] fn zero() -> Self { 0 } } impl Monoid for i128 { - #[inline] fn zero() -> Self { 0 } + #[inline] fn zero() -> Self { 0 } } impl Monoid for i64 { - #[inline] fn zero() -> Self { 0 } + #[inline] fn zero() -> Self { 0 } } impl Monoid for i32 { - #[inline] fn zero() -> Self { 0 } + #[inline] fn zero() -> Self { 0 } } impl Monoid for i16 { - #[inline] fn zero() -> Self { 0 } + #[inline] fn zero() -> Self { 0 } } impl Monoid for i8 { - #[inline] fn zero() -> Self { 0 } + #[inline] fn zero() -> Self { 0 } } @@ -104,209 +104,209 @@ impl> Abelian for T { } pub use self::present::Present; mod present { - /// A zero-sized difference that indicates the presence of a record. - /// - /// This difference type has no negation, and present records cannot be retracted. - /// Addition and multiplication maintain presence, and zero does not inhabit the type. - /// - /// The primary feature of this type is that it has zero size, which reduces the overhead - /// of differential dataflow's representations for settings where collections either do - /// not change, or for which records are only added (for example, derived facts in Datalog). - #[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] - pub struct Present; - - impl<'a> std::ops::AddAssign<&'a Self> for Present { - fn add_assign(&mut self, _rhs: &'a Self) { } - } - - impl std::ops::Mul for Present { - type Output = T; - fn mul(self, rhs: T) -> T { - rhs - } - } - - impl super::Semigroup for Present { - fn is_zero(&self) -> bool { false } - } + /// A zero-sized difference that indicates the presence of a record. + /// + /// This difference type has no negation, and present records cannot be retracted. + /// Addition and multiplication maintain presence, and zero does not inhabit the type. + /// + /// The primary feature of this type is that it has zero size, which reduces the overhead + /// of differential dataflow's representations for settings where collections either do + /// not change, or for which records are only added (for example, derived facts in Datalog). + #[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] + pub struct Present; + + impl<'a> std::ops::AddAssign<&'a Self> for Present { + fn add_assign(&mut self, _rhs: &'a Self) { } + } + + impl std::ops::Mul for Present { + type Output = T; + fn mul(self, rhs: T) -> T { + rhs + } + } + + impl super::Semigroup for Present { + fn is_zero(&self) -> bool { false } + } } pub use self::pair::DiffPair; mod pair { - use std::ops::{AddAssign, Neg, Mul}; - use super::{Semigroup, Monoid}; - - /// The difference defined by a pair of difference elements. - /// - /// This type is essentially a "pair", though in Rust the tuple types do not derive the numeric - /// traits we require, and so we need to emulate the types ourselves. In the interest of ergonomics, - /// we may eventually replace the numeric traits with our own, so that we can implement them for - /// tuples and allow users to ignore details like these. - #[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] - pub struct DiffPair { - /// The first element in the pair. - pub element1: R1, - /// The second element in the pair. - pub element2: R2, - } - - impl DiffPair { - /// Creates a new Diff pair from two elements. - #[inline] pub fn new(elt1: R1, elt2: R2) -> Self { - DiffPair { - element1: elt1, - element2: elt2, - } - } - } - - impl Semigroup for DiffPair { - #[inline] fn is_zero(&self) -> bool { - self.element1.is_zero() && self.element2.is_zero() - } - } - - impl<'a, R1: AddAssign<&'a R1>, R2: AddAssign<&'a R2>> AddAssign<&'a DiffPair> for DiffPair { - #[inline] fn add_assign(&mut self, rhs: &'a Self) { - self.element1 += &rhs.element1; - self.element2 += &rhs.element2; - } - } - - impl Neg for DiffPair { - type Output = DiffPair<::Output, ::Output>; - #[inline] fn neg(self) -> Self::Output { - DiffPair { - element1: -self.element1, - element2: -self.element2, - } - } - } - - impl, R2: Mul> Mul for DiffPair { - type Output = DiffPair<>::Output, >::Output>; - fn mul(self, other: T) -> Self::Output { - DiffPair::new( - self.element1 * other, - self.element2 * other, - ) - } - } - - impl Monoid for DiffPair { - fn zero() -> Self { - Self { - element1: R1::zero(), - element2: R2::zero(), - } - } - } - - // // TODO: This currently causes rustc to trip a recursion limit, because who knows why. - // impl Mul> for isize - // where isize: Mul, isize: Mul, >::Output: Diff, >::Output: Diff { - // type Output = DiffPair<>::Output, >::Output>; - // fn mul(self, other: DiffPair) -> Self::Output { - // DiffPair::new( - // self * other.element1, - // self * other.element2, - // ) - // } - // } + use std::ops::{AddAssign, Neg, Mul}; + use super::{Semigroup, Monoid}; + + /// The difference defined by a pair of difference elements. + /// + /// This type is essentially a "pair", though in Rust the tuple types do not derive the numeric + /// traits we require, and so we need to emulate the types ourselves. In the interest of ergonomics, + /// we may eventually replace the numeric traits with our own, so that we can implement them for + /// tuples and allow users to ignore details like these. + #[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] + pub struct DiffPair { + /// The first element in the pair. + pub element1: R1, + /// The second element in the pair. + pub element2: R2, + } + + impl DiffPair { + /// Creates a new Diff pair from two elements. + #[inline] pub fn new(elt1: R1, elt2: R2) -> Self { + DiffPair { + element1: elt1, + element2: elt2, + } + } + } + + impl Semigroup for DiffPair { + #[inline] fn is_zero(&self) -> bool { + self.element1.is_zero() && self.element2.is_zero() + } + } + + impl<'a, R1: AddAssign<&'a R1>, R2: AddAssign<&'a R2>> AddAssign<&'a DiffPair> for DiffPair { + #[inline] fn add_assign(&mut self, rhs: &'a Self) { + self.element1 += &rhs.element1; + self.element2 += &rhs.element2; + } + } + + impl Neg for DiffPair { + type Output = DiffPair<::Output, ::Output>; + #[inline] fn neg(self) -> Self::Output { + DiffPair { + element1: -self.element1, + element2: -self.element2, + } + } + } + + impl, R2: Mul> Mul for DiffPair { + type Output = DiffPair<>::Output, >::Output>; + fn mul(self, other: T) -> Self::Output { + DiffPair::new( + self.element1 * other, + self.element2 * other, + ) + } + } + + impl Monoid for DiffPair { + fn zero() -> Self { + Self { + element1: R1::zero(), + element2: R2::zero(), + } + } + } + + // // TODO: This currently causes rustc to trip a recursion limit, because who knows why. + // impl Mul> for isize + // where isize: Mul, isize: Mul, >::Output: Diff, >::Output: Diff { + // type Output = DiffPair<>::Output, >::Output>; + // fn mul(self, other: DiffPair) -> Self::Output { + // DiffPair::new( + // self * other.element1, + // self * other.element2, + // ) + // } + // } } pub use self::vector::DiffVector; mod vector { - use std::ops::{AddAssign, Neg, Mul}; - use super::{Semigroup, Monoid}; - - /// A variable number of accumulable updates. - #[derive(Abomonation, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] - pub struct DiffVector { - buffer: Vec, - } - - impl DiffVector { - /// Create new DiffVector from Vec - #[inline(always)] - pub fn new(vec: Vec) -> DiffVector { - DiffVector { buffer: vec } - } - } - - impl IntoIterator for DiffVector { - type Item = R; - type IntoIter = ::std::vec::IntoIter; - fn into_iter(self) -> Self::IntoIter { - self.buffer.into_iter() - } - } - - impl std::ops::Deref for DiffVector { - type Target = [R]; - fn deref(&self) -> &Self::Target { - &self.buffer[..] - } - } - - impl std::ops::DerefMut for DiffVector { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.buffer[..] - } - } - - impl Semigroup for DiffVector { - #[inline] fn is_zero(&self) -> bool { - self.buffer.iter().all(|x| x.is_zero()) - } - } - - impl<'a, R: AddAssign<&'a R>+Clone> AddAssign<&'a DiffVector> for DiffVector { - #[inline] - fn add_assign(&mut self, rhs: &'a Self) { - - // Ensure sufficient length to receive addition. - while self.buffer.len() < rhs.buffer.len() { - let element = &rhs.buffer[self.buffer.len()]; - self.buffer.push(element.clone()); - } - - // As other is not longer, apply updates without tests. - for (index, update) in rhs.buffer.iter().enumerate() { - self.buffer[index] += update; - } - } - } - - impl+Clone> Neg for DiffVector { - type Output = DiffVector<::Output>; - #[inline] - fn neg(mut self) -> Self::Output { - for update in self.buffer.iter_mut() { - *update = -update.clone(); - } - self - } - } - - impl> Mul for DiffVector { - type Output = DiffVector<>::Output>; - fn mul(self, other: T) -> Self::Output { - let buffer = - self.buffer - .into_iter() - .map(|x| x * other) - .collect(); - - DiffVector { buffer } - } - } - - impl Monoid for DiffVector { - fn zero() -> Self { - Self { buffer: Vec::new() } - } - } + use std::ops::{AddAssign, Neg, Mul}; + use super::{Semigroup, Monoid}; + + /// A variable number of accumulable updates. + #[derive(Abomonation, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] + pub struct DiffVector { + buffer: Vec, + } + + impl DiffVector { + /// Create new DiffVector from Vec + #[inline(always)] + pub fn new(vec: Vec) -> DiffVector { + DiffVector { buffer: vec } + } + } + + impl IntoIterator for DiffVector { + type Item = R; + type IntoIter = ::std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.buffer.into_iter() + } + } + + impl std::ops::Deref for DiffVector { + type Target = [R]; + fn deref(&self) -> &Self::Target { + &self.buffer[..] + } + } + + impl std::ops::DerefMut for DiffVector { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.buffer[..] + } + } + + impl Semigroup for DiffVector { + #[inline] fn is_zero(&self) -> bool { + self.buffer.iter().all(|x| x.is_zero()) + } + } + + impl<'a, R: AddAssign<&'a R>+Clone> AddAssign<&'a DiffVector> for DiffVector { + #[inline] + fn add_assign(&mut self, rhs: &'a Self) { + + // Ensure sufficient length to receive addition. + while self.buffer.len() < rhs.buffer.len() { + let element = &rhs.buffer[self.buffer.len()]; + self.buffer.push(element.clone()); + } + + // As other is not longer, apply updates without tests. + for (index, update) in rhs.buffer.iter().enumerate() { + self.buffer[index] += update; + } + } + } + + impl+Clone> Neg for DiffVector { + type Output = DiffVector<::Output>; + #[inline] + fn neg(mut self) -> Self::Output { + for update in self.buffer.iter_mut() { + *update = -update.clone(); + } + self + } + } + + impl> Mul for DiffVector { + type Output = DiffVector<>::Output>; + fn mul(self, other: T) -> Self::Output { + let buffer = + self.buffer + .into_iter() + .map(|x| x * other) + .collect(); + + DiffVector { buffer } + } + } + + impl Monoid for DiffVector { + fn zero() -> Self { + Self { buffer: Vec::new() } + } + } } \ No newline at end of file diff --git a/src/input.rs b/src/input.rs index 1bb182c6c..d3cdff34f 100644 --- a/src/input.rs +++ b/src/input.rs @@ -31,19 +31,19 @@ pub trait Input : TimelyInput { /// fn main() { /// ::timely::execute(Config::thread(), |worker| { /// - /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { - /// // create input handle and collection. - /// let (handle, data) = scope.new_collection(); - /// let probe = data.map(|x| x * 2) - /// .inspect(|x| println!("{:?}", x)) - /// .probe(); - /// (handle, probe) - /// }); + /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { + /// // create input handle and collection. + /// let (handle, data) = scope.new_collection(); + /// let probe = data.map(|x| x * 2) + /// .inspect(|x| println!("{:?}", x)) + /// .probe(); + /// (handle, probe) + /// }); /// - /// handle.insert(1); - /// handle.insert(5); + /// handle.insert(1); + /// handle.insert(5); /// - /// }).unwrap(); + /// }).unwrap(); /// } /// ``` fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, Collection) @@ -62,19 +62,19 @@ pub trait Input : TimelyInput { /// fn main() { /// ::timely::execute(Config::thread(), |worker| { /// - /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { - /// // create input handle and collection. - /// let (handle, data) = scope.new_collection_from(0 .. 10); - /// let probe = data.map(|x| x * 2) - /// .inspect(|x| println!("{:?}", x)) - /// .probe(); - /// (handle, probe) - /// }); + /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { + /// // create input handle and collection. + /// let (handle, data) = scope.new_collection_from(0 .. 10); + /// let probe = data.map(|x| x * 2) + /// .inspect(|x| println!("{:?}", x)) + /// .probe(); + /// (handle, probe) + /// }); /// - /// handle.insert(1); - /// handle.insert(5); + /// handle.insert(1); + /// handle.insert(5); /// - /// }).unwrap(); + /// }).unwrap(); /// } /// ``` fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, Collection) @@ -116,8 +116,8 @@ use lattice::Lattice; impl Input for G where ::Timestamp: Lattice { fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, Collection) where D: Data, R: Semigroup{ - let (handle, stream) = self.new_input(); - (InputSession::from(handle), stream.as_collection()) + let (handle, stream) = self.new_input(); + (InputSession::from(handle), stream.as_collection()) } fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, Collection) where I: IntoIterator+'static, I::Item: Data { @@ -156,47 +156,47 @@ impl Input for G where ::Timestamp: Lattice { /// fn main() { /// ::timely::execute(Config::thread(), |worker| { /// -/// let (mut handle, probe) = worker.dataflow(|scope| { -/// // create input handle and collection. -/// let (handle, data) = scope.new_collection_from(0 .. 10); -/// let probe = data.map(|x| x * 2) -/// .inspect(|x| println!("{:?}", x)) -/// .probe(); -/// (handle, probe) -/// }); +/// let (mut handle, probe) = worker.dataflow(|scope| { +/// // create input handle and collection. +/// let (handle, data) = scope.new_collection_from(0 .. 10); +/// let probe = data.map(|x| x * 2) +/// .inspect(|x| println!("{:?}", x)) +/// .probe(); +/// (handle, probe) +/// }); /// -/// handle.insert(3); -/// handle.advance_to(1); -/// handle.insert(5); -/// handle.advance_to(2); -/// handle.flush(); +/// handle.insert(3); +/// handle.advance_to(1); +/// handle.insert(5); +/// handle.advance_to(2); +/// handle.flush(); /// -/// while probe.less_than(handle.time()) { -/// worker.step(); -/// } +/// while probe.less_than(handle.time()) { +/// worker.step(); +/// } /// -/// handle.remove(5); -/// handle.advance_to(3); -/// handle.flush(); +/// handle.remove(5); +/// handle.advance_to(3); +/// handle.flush(); /// -/// while probe.less_than(handle.time()) { -/// worker.step(); -/// } +/// while probe.less_than(handle.time()) { +/// worker.step(); +/// } /// -/// }).unwrap(); +/// }).unwrap(); /// } /// ``` pub struct InputSession { - time: T, - buffer: Vec<(D, T, R)>, - handle: Handle, + time: T, + buffer: Vec<(D, T, R)>, + handle: Handle, } impl InputSession { - /// Adds an element to the collection. - pub fn insert(&mut self, element: D) { self.update(element, 1); } - /// Removes an element from the collection. - pub fn remove(&mut self, element: D) { self.update(element,-1); } + /// Adds an element to the collection. + pub fn insert(&mut self, element: D) { self.update(element, 1); } + /// Removes an element from the collection. + pub fn remove(&mut self, element: D) { self.update(element,-1); } } // impl InputSession { @@ -235,17 +235,17 @@ impl InputSession { } } - /// Creates a new session from a reference to an input handle. - pub fn from(handle: Handle) -> Self { - InputSession { - time: handle.time().clone(), - buffer: Vec::new(), - handle, - } - } + /// Creates a new session from a reference to an input handle. + pub fn from(handle: Handle) -> Self { + InputSession { + time: handle.time().clone(), + buffer: Vec::new(), + handle, + } + } - /// Adds to the weight of an element in the collection. - pub fn update(&mut self, element: D, change: R) { + /// Adds to the weight of an element in the collection. + pub fn update(&mut self, element: D, change: R) { if self.buffer.len() == self.buffer.capacity() { if self.buffer.len() > 0 { self.handle.send_batch(&mut self.buffer); @@ -253,8 +253,8 @@ impl InputSession { // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such. self.buffer.reserve(1024); } - self.buffer.push((element, self.time.clone(), change)); - } + self.buffer.push((element, self.time.clone(), change)); + } /// Adds to the weight of an element in the collection at a future time. pub fn update_at(&mut self, element: D, time: T, change: R) { @@ -269,40 +269,40 @@ impl InputSession { self.buffer.push((element, time, change)); } - /// Forces buffered data into the timely dataflow input, and advances its time to match that of the session. - /// - /// It is important to call `flush` before expecting timely dataflow to report progress. Until this method is - /// called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is - /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible. - pub fn flush(&mut self) { - self.handle.send_batch(&mut self.buffer); - if self.handle.epoch().less_than(&self.time) { - self.handle.advance_to(self.time.clone()); - } - } + /// Forces buffered data into the timely dataflow input, and advances its time to match that of the session. + /// + /// It is important to call `flush` before expecting timely dataflow to report progress. Until this method is + /// called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is + /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible. + pub fn flush(&mut self) { + self.handle.send_batch(&mut self.buffer); + if self.handle.epoch().less_than(&self.time) { + self.handle.advance_to(self.time.clone()); + } + } - /// Advances the logical time for future records. - /// - /// Importantly, this method does **not** immediately inform timely dataflow of the change. This happens only when - /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while` - /// method unless the session has just been flushed. - pub fn advance_to(&mut self, time: T) { - assert!(self.handle.epoch().less_equal(&time)); - assert!(&self.time.less_equal(&time)); - self.time = time; - } + /// Advances the logical time for future records. + /// + /// Importantly, this method does **not** immediately inform timely dataflow of the change. This happens only when + /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while` + /// method unless the session has just been flushed. + pub fn advance_to(&mut self, time: T) { + assert!(self.handle.epoch().less_equal(&time)); + assert!(&self.time.less_equal(&time)); + self.time = time; + } - /// Reveals the current time of the session. - pub fn epoch(&self) -> &T { &self.time } - /// Reveals the current time of the session. - pub fn time(&self) -> &T { &self.time } + /// Reveals the current time of the session. + pub fn epoch(&self) -> &T { &self.time } + /// Reveals the current time of the session. + pub fn time(&self) -> &T { &self.time } - /// Closes the input, flushing and sealing the wrapped timely input. - pub fn close(self) { } + /// Closes the input, flushing and sealing the wrapped timely input. + pub fn close(self) { } } impl Drop for InputSession { - fn drop(&mut self) { - self.flush(); - } + fn drop(&mut self) { + self.flush(); + } } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index c02268bc6..d76e7cab0 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -14,72 +14,72 @@ pub use self::cursor_list::CursorList; /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor { - /// Type the cursor addresses data in. - type Storage; + /// Type the cursor addresses data in. + type Storage; - /// Indicates if the current key is valid. - /// - /// A value of `false` indicates that the cursor has exhausted all keys. - fn key_valid(&self, storage: &Self::Storage) -> bool; - /// Indicates if the current value is valid. - /// - /// A value of `false` indicates that the cursor has exhausted all values for this key. - fn val_valid(&self, storage: &Self::Storage) -> bool; + /// Indicates if the current key is valid. + /// + /// A value of `false` indicates that the cursor has exhausted all keys. + fn key_valid(&self, storage: &Self::Storage) -> bool; + /// Indicates if the current value is valid. + /// + /// A value of `false` indicates that the cursor has exhausted all values for this key. + fn val_valid(&self, storage: &Self::Storage) -> bool; - /// A reference to the current key. Asserts if invalid. - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K; - /// A reference to the current value. Asserts if invalid. - fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V; + /// A reference to the current key. Asserts if invalid. + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K; + /// A reference to the current value. Asserts if invalid. + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V; - /// Returns a reference to the current key, if valid. - fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a K> { - if self.key_valid(storage) { Some(self.key(storage)) } else { None } - } - /// Returns a reference to the current value, if valid. - fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a V> { - if self.val_valid(storage) { Some(self.val(storage)) } else { None } - } + /// Returns a reference to the current key, if valid. + fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a K> { + if self.key_valid(storage) { Some(self.key(storage)) } else { None } + } + /// Returns a reference to the current value, if valid. + fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a V> { + if self.val_valid(storage) { Some(self.val(storage)) } else { None } + } - /// Applies `logic` to each pair of time and difference. Intended for mutation of the - /// closure's scope. - fn map_times(&mut self, storage: &Self::Storage, logic: L); + /// Applies `logic` to each pair of time and difference. Intended for mutation of the + /// closure's scope. + fn map_times(&mut self, storage: &Self::Storage, logic: L); - /// Advances the cursor to the next key. - fn step_key(&mut self, storage: &Self::Storage); - /// Advances the cursor to the specified key. - fn seek_key(&mut self, storage: &Self::Storage, key: &K); + /// Advances the cursor to the next key. + fn step_key(&mut self, storage: &Self::Storage); + /// Advances the cursor to the specified key. + fn seek_key(&mut self, storage: &Self::Storage, key: &K); - /// Advances the cursor to the next value. - fn step_val(&mut self, storage: &Self::Storage); - /// Advances the cursor to the specified value. - fn seek_val(&mut self, storage: &Self::Storage, val: &V); + /// Advances the cursor to the next value. + fn step_val(&mut self, storage: &Self::Storage); + /// Advances the cursor to the specified value. + fn seek_val(&mut self, storage: &Self::Storage, val: &V); - /// Rewinds the cursor to the first key. - fn rewind_keys(&mut self, storage: &Self::Storage); - /// Rewinds the cursor to the first value for current key. - fn rewind_vals(&mut self, storage: &Self::Storage); + /// Rewinds the cursor to the first key. + fn rewind_keys(&mut self, storage: &Self::Storage); + /// Rewinds the cursor to the first value for current key. + fn rewind_vals(&mut self, storage: &Self::Storage); } /// Debugging and testing utilities for Cursor. pub trait CursorDebug : Cursor { - /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((K, V), Vec<(T, R)>)> { - let mut out = Vec::new(); - self.rewind_keys(storage); - self.rewind_vals(storage); - while self.key_valid(storage) { - while self.val_valid(storage) { - let mut kv_out = Vec::new(); - self.map_times(storage, |ts, r| { - kv_out.push((ts.clone(), r.clone())); - }); - out.push(((self.key(storage).clone(), self.val(storage).clone()), kv_out)); - self.step_val(storage); - } - self.step_key(storage); - } - out - } + /// Rewinds the cursor and outputs its contents to a Vec + fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((K, V), Vec<(T, R)>)> { + let mut out = Vec::new(); + self.rewind_keys(storage); + self.rewind_vals(storage); + while self.key_valid(storage) { + while self.val_valid(storage) { + let mut kv_out = Vec::new(); + self.map_times(storage, |ts, r| { + kv_out.push((ts.clone(), r.clone())); + }); + out.push(((self.key(storage).clone(), self.val(storage).clone()), kv_out)); + self.step_val(storage); + } + self.step_key(storage); + } + out + } } impl CursorDebug for C where C: Cursor { } diff --git a/src/trace/description.rs b/src/trace/description.rs index 573887751..e11355808 100644 --- a/src/trace/description.rs +++ b/src/trace/description.rs @@ -66,32 +66,32 @@ use timely::{PartialOrder, progress::Antichain}; /// any time after `since`. #[derive(Clone, Debug, Abomonation)] pub struct Description