diff --git a/examples/capture-test.rs b/examples/capture-test.rs index 07f2380f4..19b00a354 100644 --- a/examples/capture-test.rs +++ b/examples/capture-test.rs @@ -153,7 +153,7 @@ pub mod kafka { use differential_dataflow::lattice::Lattice; /// Creates a Kafka source from supplied configuration information. - pub fn create_source(scope: G, addr: &str, topic: &str, group: &str) -> (Box, Stream) + pub fn create_source(scope: G, addr: &str, topic: &str, group: &str) -> (Box, Stream) where G: Scope, D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>, diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 8016c12cc..63f950bc7 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -423,19 +423,26 @@ where Tr: TraceReader, { // This frontier describes our only guarantee on the compaction frontier. - let frontier = self.get_logical_compaction().to_owned(); - self.import_frontier_core(scope, name, frontier) + let since = self.get_logical_compaction().to_owned(); + self.import_frontier_core(scope, name, since, Antichain::new()) } - /// Import a trace advanced to a specific frontier. - pub fn import_frontier_core(&mut self, scope: &G, name: &str, frontier: Antichain) -> (Arranged>>, ShutdownButton>) + /// Import a trace restricted to a specific time interval `[since, until)`. + /// + /// All updates present in the input trace will be first advanced to `since`, and then either emitted, + /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal + /// to `until` the operator capability will be dropped. + /// + /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty + /// frontier indicates the end of times. + pub fn import_frontier_core(&mut self, scope: &G, name: &str, since: Antichain, until: Antichain) -> (Arranged>>, ShutdownButton>) where G: Scope, Tr::Time: Timestamp+ Lattice+Ord+Clone+'static, Tr: TraceReader, { let trace = self.clone(); - let trace = TraceFrontier::make_from(trace, frontier.borrow()); + let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow()); let mut shutdown_button = None; @@ -458,18 +465,27 @@ where let mut capabilities = capabilities.borrow_mut(); if let Some(ref mut capabilities) = *capabilities { - let mut borrow = queue.1.borrow_mut(); for instruction in borrow.drain(..) { - match instruction { - TraceReplayInstruction::Frontier(frontier) => { - capabilities.downgrade(&frontier.borrow()[..]); - }, - TraceReplayInstruction::Batch(batch, hint) => { - if let Some(time) = hint { - if !batch.is_empty() { - let delayed = capabilities.delayed(&time); - output.session(&delayed).give(BatchFrontier::make_from(batch, frontier.borrow())); + // If we have dropped the capabilities due to `until`, attempt no further work. + // Without the capabilities, we should soon be shut down (once this loop ends). + if !capabilities.is_empty() { + match instruction { + TraceReplayInstruction::Frontier(frontier) => { + if timely::PartialOrder::less_equal(&until, &frontier) { + // It might be nice to actively *drop* `capabilities`, but it seems + // complicated logically (i.e. we'd have to break out of the loop). + capabilities.downgrade(&[]); + } else { + capabilities.downgrade(&frontier.borrow()[..]); + } + }, + TraceReplayInstruction::Batch(batch, hint) => { + if let Some(time) = hint { + if !batch.is_empty() { + let delayed = capabilities.delayed(&time); + output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow())); + } } } } diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index ec321b71e..cb0865c49 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -1,8 +1,10 @@ //! Wrapper for frontiered trace. //! -//! Wraps a trace with a frontier so that all exposed timestamps are first advanced by the frontier. -//! This ensures that even for traces that have been advanced, all views provided through cursors -//! present deterministic times, independent of the compaction strategy. +//! Wraps a trace with `since` and `upper` frontiers so that all exposed timestamps are first advanced +//! by the `since` frontier and restricted by the `upper` frontier. This presents a deterministic trace +//! on the interval `[since, upper)`, presenting only accumulations up to `since` (rather than partially +//! accumulated updates) and no updates at times greater or equal to `upper` (even as parts of batches +//! that span that time). use timely::progress::Timestamp; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -17,7 +19,10 @@ where Tr: TraceReader, { trace: Tr, - frontier: Antichain, + /// Frontier to which all update times will be advanced. + since: Antichain, + /// Frontier after which all update times will be suppressed. + until: Antichain, } impl Clone for TraceFrontier @@ -28,7 +33,8 @@ where fn clone(&self) -> Self { TraceFrontier { trace: self.trace.clone(), - frontier: self.frontier.clone(), + since: self.since.clone(), + until: self.until.clone(), } } } @@ -51,8 +57,9 @@ where type Cursor = CursorFrontier; fn map_batches(&self, mut f: F) { - let frontier = self.frontier.borrow(); - self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), frontier))) + let since = self.since.borrow(); + let until = self.until.borrow(); + self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until))) } fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_logical_compaction(frontier) } @@ -62,8 +69,9 @@ where fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, ::Storage)> { - let frontier = self.frontier.borrow(); - self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, frontier), y)) + let since = self.since.borrow(); + let until = self.until.borrow(); + self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y)) } } @@ -73,10 +81,11 @@ where Tr::Time: Timestamp, { /// Makes a new trace wrapper - pub fn make_from(trace: Tr, frontier: AntichainRef) -> Self { + pub fn make_from(trace: Tr, since: AntichainRef, until: AntichainRef) -> Self { TraceFrontier { trace, - frontier: frontier.to_owned(), + since: since.to_owned(), + until: until.to_owned(), } } } @@ -86,10 +95,11 @@ where #[derive(Clone)] pub struct BatchFrontier { batch: B, - frontier: Antichain, + since: Antichain, + until: Antichain, } -impl BatchReader for BatchFrontier +impl BatchReader for BatchFrontier where B: BatchReader, B::Time: Timestamp+Lattice, @@ -102,7 +112,7 @@ where type Cursor = BatchCursorFrontier; fn cursor(&self) -> Self::Cursor { - BatchCursorFrontier::new(self.batch.cursor(), self.frontier.borrow()) + BatchCursorFrontier::new(self.batch.cursor(), self.since.borrow(), self.until.borrow()) } fn len(&self) -> usize { self.batch.len() } fn description(&self) -> &Description { &self.batch.description() } @@ -114,10 +124,11 @@ where B::Time: Timestamp+Lattice, { /// Makes a new batch wrapper - pub fn make_from(batch: B, frontier: AntichainRef) -> Self { + pub fn make_from(batch: B, since: AntichainRef, until: AntichainRef) -> Self { BatchFrontier { batch, - frontier: frontier.to_owned(), + since: since.to_owned(), + until: until.to_owned(), } } } @@ -125,14 +136,16 @@ where /// Wrapper to provide cursor to nested scope. pub struct CursorFrontier { cursor: C, - frontier: Antichain, + since: Antichain, + until: Antichain } impl CursorFrontier where C::Time: Clone { - fn new(cursor: C, frontier: AntichainRef) -> Self { + fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { CursorFrontier { cursor, - frontier: frontier.to_owned(), + since: since.to_owned(), + until: until.to_owned(), } } } @@ -157,12 +170,15 @@ where #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { - let frontier = self.frontier.borrow(); + let since = self.since.borrow(); + let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(storage, |time, diff| { temp.clone_from(time); - temp.advance_by(frontier); - logic(&temp, diff); + temp.advance_by(since); + if !until.less_equal(&temp) { + logic(&temp, diff); + } }) } @@ -181,14 +197,16 @@ where /// Wrapper to provide cursor to nested scope. pub struct BatchCursorFrontier { cursor: B::Cursor, - frontier: Antichain, + since: Antichain, + until: Antichain, } impl BatchCursorFrontier where B::Time: Clone { - fn new(cursor: B::Cursor, frontier: AntichainRef) -> Self { + fn new(cursor: B::Cursor, since: AntichainRef, until: AntichainRef) -> Self { BatchCursorFrontier { cursor, - frontier: frontier.to_owned(), + since: since.to_owned(), + until: until.to_owned(), } } } @@ -212,12 +230,15 @@ where #[inline] fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { - let frontier = self.frontier.borrow(); + let since = self.since.borrow(); + let until = self.until.borrow(); let mut temp: B::Time = ::minimum(); self.cursor.map_times(&storage.batch, |time, diff| { temp.clone_from(time); - temp.advance_by(frontier); - logic(&temp, diff); + temp.advance_by(since); + if !until.less_equal(&temp) { + logic(&temp, diff); + } }) }