diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index d0aa14cae..3715ccfe4 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -45,7 +45,7 @@ where S: FnMut(&D, &R, &Tr::Val, &Tr::R)->(DOut, ROut)+'static, { // No need to block physical merging for this operator. - arrangement.trace.distinguish_since(Antichain::new().borrow()); + arrangement.trace.set_physical_compaction(Antichain::new().borrow()); let mut propose_trace = Some(arrangement.trace); let propose_stream = arrangement.stream; @@ -138,7 +138,7 @@ where for key in stash.keys() { frontier.insert(key.time().clone()); } - propose_trace.as_mut().map(|trace| trace.advance_by(frontier.borrow())); + propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); if input1.frontier().is_empty() && stash.is_empty() { propose_trace = None; diff --git a/examples/cursors.rs b/examples/cursors.rs index b91ff7ebf..29e9ddc00 100644 --- a/examples/cursors.rs +++ b/examples/cursors.rs @@ -78,8 +78,8 @@ fn main() { graph.close(); for i in 1..rounds + 1 { /* Advance the trace frontier to enable trace compaction. */ - graph_trace.distinguish_since(AntichainRef::new(&[i])); - graph_trace.advance_by(AntichainRef::new(&[i])); + graph_trace.set_physical_compaction(AntichainRef::new(&[i])); + graph_trace.set_logical_compaction(AntichainRef::new(&[i])); worker.step_while(|| probe.less_than(&i)); dump_cursor(i, worker.index(), &mut graph_trace); } @@ -93,8 +93,8 @@ fn main() { } graph.advance_to(i); graph.flush(); - graph_trace.distinguish_since(AntichainRef::new(&[i])); - graph_trace.advance_by(AntichainRef::new(&[i])); + graph_trace.set_physical_compaction(AntichainRef::new(&[i])); + graph_trace.set_logical_compaction(AntichainRef::new(&[i])); worker.step_while(|| probe.less_than(graph.time())); dump_cursor(i, worker.index(), &mut graph_trace); } diff --git a/examples/multitemporal.rs b/examples/multitemporal.rs index 2d663a2f0..5c3bb5c0e 100644 --- a/examples/multitemporal.rs +++ b/examples/multitemporal.rs @@ -22,7 +22,7 @@ use pair::Pair; fn main() { - timely::execute_from_args(std::env::args(), move |worker| { + timely::execute_from_args(std::env::args(), move |worker| { // Used to determine if our output has caught up to our input. let mut probe: ProbeHandle> = ProbeHandle::new(); @@ -49,7 +49,7 @@ fn main() { }); // Do not hold back physical compaction. - trace.distinguish_since(AntichainRef::new(&[])); + trace.set_physical_compaction(AntichainRef::new(&[])); println!("Multi-temporal histogram; valid commands are (integer arguments):"); println!(" update value time1 time2 change"); @@ -86,13 +86,13 @@ fn main() { }, ("advance-output", 2) => { let time = Pair::new(arguments[0], arguments[1]); - if trace.advance_frontier().less_equal(&time) { - trace.advance_by(AntichainRef::new(&[time])); + if trace.get_logical_compaction().less_equal(&time) { + trace.set_logical_compaction(AntichainRef::new(&[time])); while probe.less_than(capability.time()) { worker.step(); } } else { - println!("Requested time {:?} not readable (output from {:?})", time, trace.advance_frontier()); + println!("Requested time {:?} not readable (output from {:?})", time, trace.get_logical_compaction()); } }, ("query", 2) => { @@ -100,8 +100,8 @@ fn main() { let query_time = Pair::new(arguments[0], arguments[1]); if capability.time().less_equal(&query_time) { println!("Query time ({:?}) is still open (input from {:?}).", query_time, capability.time()); - } else if !trace.advance_frontier().less_equal(&query_time) { - println!("Query time ({:?}) no longer available in output (output from {:?}).", query_time, trace.advance_frontier()); + } else if !trace.get_logical_compaction().less_equal(&query_time) { + println!("Query time ({:?}) no longer available in output (output from {:?}).", query_time, trace.get_logical_compaction()); } else { println!("Report at {:?}", query_time); diff --git a/interactive/src/manager.rs b/interactive/src/manager.rs index e67b285dc..bd28cefde 100644 --- a/interactive/src/manager.rs +++ b/interactive/src/manager.rs @@ -187,13 +187,13 @@ impl TraceManager { use timely::progress::frontier::Antichain; let frontier = Antichain::from_elem(time.clone()); for trace in self.inputs.values_mut() { - trace.advance_by(frontier.borrow()); - trace.distinguish_since(frontier.borrow()); + trace.set_logical_compaction(frontier.borrow()); + trace.set_physical_compaction(frontier.borrow()); } for map in self.arrangements.values_mut() { for trace in map.values_mut() { - trace.advance_by(frontier.borrow()); - trace.distinguish_since(frontier.borrow()); + trace.set_logical_compaction(frontier.borrow()); + trace.set_physical_compaction(frontier.borrow()); } } } diff --git a/mdbook/src/chapter_5/chapter_5_3.md b/mdbook/src/chapter_5/chapter_5_3.md index 7d41bbb0c..cbd4e8c70 100644 --- a/mdbook/src/chapter_5/chapter_5_3.md +++ b/mdbook/src/chapter_5/chapter_5_3.md @@ -94,6 +94,6 @@ When we extract a trace from an arrangement, we acquire the ability to replay th A `TraceHandle` (the type of `trace`) has two important methods. Their names are not great, and subject to change in the future. Their idioms may also change as more information flows in about users and use cases. -1. `advance_by(frontier)`. This method informs `trace` that it will no longer be called upon to handle queries for times not in advance of `frontier`, a set of timestamps. This gives the arrangement permission to coalesce otherwise indistinguishable timestamps, which it will start to do once all handles have advanced. +1. `set_logical_compaction(frontier)`. This method informs `trace` that it will no longer be called upon to handle queries for times not in advance of `frontier`, a set of timestamps. This gives the arrangement permission to coalesce otherwise indistinguishable timestamps, which it will start to do once all handles have advanced. -2. `distinguish_since(frontier)`. This method unblocks the merging of physical batches. It is very rare that a user wants to do anything with this other than call `trace.distinguish_since(&[])`, which unblocks all merging. Certain operators, namely `join`, do need to carefully manipulate this method. +2. `set_physical_compaction(frontier)`. This method unblocks the merging of physical batches. It is very rare that a user wants to do anything with this other than call `trace.set_physical_compaction(&[])`, which unblocks all merging. Certain operators, namely `join`, do need to carefully manipulate this method. diff --git a/server/dataflows/random_graph/src/lib.rs b/server/dataflows/random_graph/src/lib.rs index 15a786ed4..880a4a404 100644 --- a/server/dataflows/random_graph/src/lib.rs +++ b/server/dataflows/random_graph/src/lib.rs @@ -46,7 +46,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), // operator holds only a weak reference to it. // // The operator also holds an `Weak>>` which it will - // attempt to borrow and call `advance_by` in order to advance the capability + // attempt to borrow and call `set_logical_compaction` in order to advance the capability // as it runs, to allow compaction and the maintenance of bounded state. if args.len() != 4 { return Err(format!("expected four arguments, instead: {:?}", args)); } @@ -134,7 +134,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), if let Some(trace_handle) = trace_handle_weak.upgrade() { let mut borrow = trace_handle.borrow_mut(); if let Some(ref mut trace_handle) = borrow.as_mut() { - trace_handle.advance_by(&[elapsed_ns]); + trace_handle.set_logical_compaction(&[elapsed_ns]); } } @@ -191,7 +191,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(), .trace; // release all blocks on merging. - trace.distinguish_since(&[]); + trace.set_physical_compaction(&[]); *trace_handle.borrow_mut() = Some(trace); handles.set::>>>(name.to_owned(), trace_handle); diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 42f7f0f36..04e168cb5 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -54,20 +54,20 @@ where type Batch = Tr::Batch; type Cursor = Tr::Cursor; - fn advance_by(&mut self, frontier: AntichainRef) { - self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), frontier); + fn set_logical_compaction(&mut self, frontier: AntichainRef) { + self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), frontier); self.advance.clear(); self.advance.extend(frontier.iter().cloned()); } - fn advance_frontier(&mut self) -> AntichainRef { + fn get_logical_compaction(&mut self) -> AntichainRef { self.advance.borrow() } - fn distinguish_since(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier); self.through.clear(); self.through.extend(frontier.iter().cloned()); } - fn distinguish_frontier(&mut self) -> AntichainRef { + fn get_physical_compaction(&mut self) -> AntichainRef { self.through.borrow() } fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, >::Storage)> { @@ -99,7 +99,7 @@ where let reader = TraceAgent { trace: trace.clone(), queues: Rc::downgrade(&queues), - advance: trace.borrow().advance_frontiers.frontier().to_owned(), + advance: trace.borrow().get_logical_compactions.frontier().to_owned(), through: trace.borrow().through_frontiers.frontier().to_owned(), operator, logging, @@ -163,13 +163,13 @@ where /// are no longer evident. /// /// The current behavior is that the introduced collection accumulates updates to some times less or equal - /// to `self.advance_frontier()`. There is *not* currently a guarantee that the updates are accumulated *to* + /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to* /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular, /// the historical collection may move through configurations that did not actually occur, even if eventually /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end, /// the intermediate computation could do something that the original computation did not, like diverge. /// - /// I would expect the semantics to improve to "updates are advanced to `self.advance_frontier()`", which + /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know /// to advance times before using them). @@ -341,7 +341,7 @@ where /// Imports an arrangement into the supplied scope. /// - /// This variant of import uses the `advance_frontier` to forcibly advance timestamps in updates. + /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates. /// /// # Examples /// @@ -382,7 +382,7 @@ where /// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step(); /// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step(); /// - /// trace.advance_by(AntichainRef::new(&[5])); + /// trace.set_logical_compaction(AntichainRef::new(&[5])); /// /// // create a second dataflow /// let mut shutdown = worker.dataflow(|scope| { @@ -418,7 +418,7 @@ where Tr: TraceReader, { // This frontier describes our only guarantee on the compaction frontier. - let frontier = self.advance_frontier().to_owned(); + let frontier = self.get_logical_compaction().to_owned(); self.import_frontier_core(scope, name, frontier) } @@ -533,7 +533,7 @@ where // increase counts for wrapped `TraceBox`. let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_advance_frontier(empty_frontier.borrow(), self.advance.borrow()); + self.trace.borrow_mut().adjust_get_logical_compaction(empty_frontier.borrow(), self.advance.borrow()); self.trace.borrow_mut().adjust_through_frontier(empty_frontier.borrow(), self.through.borrow()); TraceAgent { @@ -562,7 +562,7 @@ where // decrement borrow counts to remove all holds let empty_frontier = Antichain::new(); - self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), empty_frontier.borrow()); + self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), empty_frontier.borrow()); self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), empty_frontier.borrow()); } } diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 886bcb590..23fbe655e 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -278,8 +278,8 @@ where queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| { let mut trace = Some(self.trace.clone()); - // release `distinguish_since` capability. - trace.as_mut().unwrap().distinguish_since(Antichain::new().borrow()); + // release `set_physical_compaction` capability. + trace.as_mut().unwrap().set_physical_compaction(Antichain::new().borrow()); let mut stash = Vec::new(); let mut capability: Option> = None; @@ -413,7 +413,7 @@ where ].into_iter().cloned().filter_map(|t| t).min(); if let Some(frontier) = frontier { - trace.as_mut().map(|t| t.advance_by(AntichainRef::new(&[frontier]))); + trace.as_mut().map(|t| t.set_logical_compaction(AntichainRef::new(&[frontier]))); } else { trace = None; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 88f137197..aef44d465 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -331,8 +331,8 @@ where input_frontier.extend(input.frontier().frontier().iter().cloned()); // Downgrade capabilities for `reader_local`. - reader_local.advance_by(input_frontier.borrow()); - reader_local.distinguish_since(input_frontier.borrow()); + reader_local.set_logical_compaction(input_frontier.borrow()); + reader_local.set_physical_compaction(input_frontier.borrow()); } if let Some(mut fuel) = effort.clone() { diff --git a/src/operators/count.rs b/src/operators/count.rs index f48405f97..746d2d77d 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -124,8 +124,8 @@ where // tidy up the shared input trace. trace.advance_upper(&mut upper_limit); - trace.advance_by(upper_limit.borrow()); - trace.distinguish_since(upper_limit.borrow()); + trace.set_logical_compaction(upper_limit.borrow()); + trace.set_physical_compaction(upper_limit.borrow()); } }) .as_collection() diff --git a/src/operators/join.rs b/src/operators/join.rs index fba306d44..5feb7e3b8 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -360,7 +360,7 @@ impl JoinCore for Arranged if !batch1.is_empty() { if let Some(acknowledged2) = &acknowledged2 { // TODO : cursor_through may be problematic for pre-merged traces. - // A trace should provide the contract that whatever its `distinguish_since` capability, + // A trace should provide the contract that whatever its `set_physical_compaction` capability, // it is safe (and reasonable) to await delivery of batches up through that frontier. // In this case, we should be able to await (not block on) the arrival of these batches. let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap(); @@ -395,7 +395,7 @@ impl JoinCore for Arranged if !batch2.is_empty() { if let Some(acknowledged1) = &acknowledged1 { // TODO : cursor_through may be problematic for pre-merged traces. - // A trace should provide the contract that whatever its `distinguish_since` capability, + // A trace should provide the contract that whatever its `set_physical_compaction` capability, // it is safe (and reasonable) to await delivery of batches up through that frontier. // In this case, we should be able to await (not block on) the arrival of these batches. let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); @@ -450,28 +450,28 @@ impl JoinCore for Arranged // shut down or advance trace2. if trace2.is_some() && input1.frontier().is_empty() { trace2 = None; } if let Some(ref mut trace2) = trace2 { - trace2.advance_by(input1.frontier().frontier()); + trace2.set_logical_compaction(input1.frontier().frontier()); // At this point, if we haven't seen any input batches we should establish a frontier anyhow. if acknowledged2.is_none() { acknowledged2 = Some(Antichain::from_elem(::minimum())); } if let Some(acknowledged2) = &mut acknowledged2 { trace2.advance_upper(acknowledged2); - trace2.distinguish_since(acknowledged2.borrow()); + trace2.set_physical_compaction(acknowledged2.borrow()); } } // shut down or advance trace1. if trace1.is_some() && input2.frontier().is_empty() { trace1 = None; } if let Some(ref mut trace1) = trace1 { - trace1.advance_by(input2.frontier().frontier()); + trace1.set_logical_compaction(input2.frontier().frontier()); // At this point, if we haven't seen any input batches we should establish a frontier anyhow. if acknowledged1.is_none() { acknowledged1 = Some(Antichain::from_elem(::minimum())); } if let Some(acknowledged1) = &mut acknowledged1 { trace1.advance_upper(acknowledged1); - trace1.distinguish_since(acknowledged1.borrow()); + trace1.set_physical_compaction(acknowledged1.borrow()); } } } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 66412a875..f037ba463 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -624,12 +624,12 @@ where } // We only anticipate future times in advance of `upper_limit`. - source_trace.advance_by(upper_limit.borrow()); - output_reader.advance_by(upper_limit.borrow()); + source_trace.set_logical_compaction(upper_limit.borrow()); + output_reader.set_logical_compaction(upper_limit.borrow()); // We will only slice the data between future batches. - source_trace.distinguish_since(upper_limit.borrow()); - output_reader.distinguish_since(upper_limit.borrow()); + source_trace.set_physical_compaction(upper_limit.borrow()); + output_reader.set_physical_compaction(upper_limit.borrow()); } // Exert trace maintenance if we have been so requested. diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index c94c4171a..7d2e439bb 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -186,8 +186,8 @@ where // tidy up the shared input trace. trace.advance_upper(&mut upper_limit); - trace.advance_by(upper_limit.borrow()); - trace.distinguish_since(upper_limit.borrow()); + trace.set_logical_compaction(upper_limit.borrow()); + trace.set_physical_compaction(upper_limit.borrow()); } }) .as_collection() diff --git a/src/trace/implementations/graph.rs b/src/trace/implementations/graph.rs index 185cadeed..f8cdd231e 100644 --- a/src/trace/implementations/graph.rs +++ b/src/trace/implementations/graph.rs @@ -51,14 +51,14 @@ where } else { None } } - fn advance_by(&mut self, frontier: &[Product]) { - self.spine.advance_by(frontier) + fn set_logical_compaction(&mut self, frontier: &[Product]) { + self.spine.set_logical_compaction(frontier) } - fn advance_frontier(&mut self) -> &[Product] { self.spine.advance_frontier() } - fn distinguish_since(&mut self, frontier: &[Product]) { - self.spine.distinguish_since(frontier) + fn get_logical_compaction(&mut self) -> &[Product] { self.spine.get_logical_compaction() } + fn set_physical_compaction(&mut self, frontier: &[Product]) { + self.spine.set_physical_compaction(frontier) } - fn distinguish_frontier(&mut self) -> &[Product] { &self.spine.distinguish_frontier() } + fn get_physical_compaction(&mut self) -> &[Product] { &self.spine.get_physical_compaction() } fn map_batches(&mut self, f: F) { self.spine.map_batches(f) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 36c1fe1a4..ffc17ec6f 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -99,8 +99,8 @@ pub struct Spine> { operator: OperatorInfo, logger: Option<::logging::Logger>, phantom: ::std::marker::PhantomData<(K, V, R)>, - advance_frontier: Vec, // Times after which the trace must accumulate correctly. - through_frontier: Vec, // Times after which the trace must be able to subset its inputs. + logical_frontier: Vec, // Times after which the trace must accumulate correctly. + physical_frontier: Vec, // Times after which the trace must be able to subset its inputs. merging: Vec>>,// Several possibly shared collections of updates. pending: Vec, // Batches at times in advance of `frontier`. upper: Vec, @@ -137,11 +137,11 @@ where // supplied upper it had better be empty. // We shouldn't grab a cursor into a closed trace, right? - assert!(self.advance_frontier.len() > 0, "cursor_through({:?}) called for closed trace", upper); + assert!(self.logical_frontier.len() > 0, "cursor_through({:?}) called for closed trace", upper); - // Check that `upper` is greater or equal to `self.through_frontier`. + // Check that `upper` is greater or equal to `self.physical_frontier`. // Otherwise, the cut could be in `self.merging` and it is user error anyhow. - assert!(upper.iter().all(|t1| self.through_frontier.iter().any(|t2| t2.less_equal(t1)))); + assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1)))); let mut cursors = Vec::new(); let mut storage = Vec::new(); @@ -197,23 +197,23 @@ where Some((CursorList::new(cursors, &storage), storage)) } - fn advance_by(&mut self, frontier: &[T]) { - self.advance_frontier = frontier.to_vec(); + fn set_logical_compaction(&mut self, frontier: &[T]) { + self.logical_frontier = frontier.to_vec(); // Commenting out for now; causes problems in `read_upper()`. // If one has an urgent need to release these resources, it // is probably best just to drop the trace. - // if self.advance_frontier.len() == 0 { + // if self.logical_frontier.len() == 0 { // self.drop_batches(); // } } - fn advance_frontier(&mut self) -> &[T] { &self.advance_frontier[..] } - fn distinguish_since(&mut self, frontier: &[T]) { - self.through_frontier = frontier.to_vec(); + fn get_logical_compaction(&mut self) -> &[T] { &self.logical_frontier[..] } + fn set_physical_compaction(&mut self, frontier: &[T]) { + self.physical_frontier = frontier.to_vec(); self.consider_merges(); } - fn distinguish_frontier(&mut self) -> &[T] { &self.through_frontier[..] } + fn get_physical_compaction(&mut self) -> &[T] { &self.physical_frontier[..] } fn map_batches(&mut self, mut f: F) { for batch in self.merging.iter().rev() { @@ -303,7 +303,7 @@ where R: Semigroup, B: Batch, { - /// Drops and logs batches. Used in advance_by and drop. + /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { if let Some(logger) = &self.logger { for batch in self.merging.drain(..) { @@ -351,8 +351,8 @@ where operator, logger, phantom: ::std::marker::PhantomData, - advance_frontier: vec![::minimum()], - through_frontier: vec![::minimum()], + logical_frontier: vec![::minimum()], + physical_frontier: vec![::minimum()], merging: Vec::new(), pending: Vec::new(), upper: vec![Default::default()], @@ -407,7 +407,7 @@ where // 2. large batches never have small indices. while self.pending.len() > 0 && - self.through_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1))) + self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1))) { // this could be a VecDeque, if we ever notice this. let batch = self.pending.remove(0); @@ -445,7 +445,7 @@ where // Step 2: Insert new batch at target position if let Some(batch2) = self.merging[batch_index].take() { let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, batch_index); - let frontier = if batch_index == self.merging.len()-1 { Some(self.advance_frontier.clone()) } else { None }; + let frontier = if batch_index == self.merging.len()-1 { Some(self.get_logical_compaction.clone()) } else { None }; self.logger.as_ref().map(|l| l.log( ::logging::MergeEvent { operator: self.operator.global_id, @@ -513,7 +513,7 @@ where let batch1 = batch.complete(&mut self.logger, self.operator.global_id, position); let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, position); // if this is the last position, engage compaction. - let frontier = if new_position+1 == self.merging.len() { Some(self.advance_frontier.clone()) } else { None }; + let frontier = if new_position+1 == self.merging.len() { Some(self.get_logical_compaction.clone()) } else { None }; self.logger.as_ref().map(|l| l.log( ::logging::MergeEvent { operator: self.operator.global_id, diff --git a/src/trace/implementations/spine_fueled_neu.rs b/src/trace/implementations/spine_fueled_neu.rs index b04c41d6c..57553d825 100644 --- a/src/trace/implementations/spine_fueled_neu.rs +++ b/src/trace/implementations/spine_fueled_neu.rs @@ -91,8 +91,8 @@ pub struct Spine> { operator: OperatorInfo, logger: Option, phantom: ::std::marker::PhantomData<(K, V, R)>, - advance_frontier: Antichain, // Times after which the trace must accumulate correctly. - through_frontier: Antichain, // Times after which the trace must be able to subset its inputs. + logical_frontier: Antichain, // Times after which the trace must accumulate correctly. + physical_frontier: Antichain, // Times after which the trace must be able to subset its inputs. merging: Vec>,// Several possibly shared collections of updates. pending: Vec, // Batches at times in advance of `frontier`. upper: Antichain, @@ -138,12 +138,12 @@ where // supplied upper it had better be empty. // We shouldn't grab a cursor into a closed trace, right? - assert!(self.advance_frontier.borrow().len() > 0); + assert!(self.logical_frontier.borrow().len() > 0); - // Check that `upper` is greater or equal to `self.through_frontier`. + // Check that `upper` is greater or equal to `self.physical_frontier`. // Otherwise, the cut could be in `self.merging` and it is user error anyhow. - // assert!(upper.iter().all(|t1| self.through_frontier.iter().any(|t2| t2.less_equal(t1)))); - assert!(PartialOrder::less_equal(&self.through_frontier.borrow(), &upper)); + // assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1)))); + assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper)); let mut cursors = Vec::new(); let mut storage = Vec::new(); @@ -211,16 +211,16 @@ where Some((CursorList::new(cursors, &storage), storage)) } - fn advance_by(&mut self, frontier: AntichainRef) { + fn set_logical_compaction(&mut self, frontier: AntichainRef) { // TODO: Re-use allocation - self.advance_frontier = frontier.to_owned(); + self.logical_frontier = frontier.to_owned(); } - fn advance_frontier(&mut self) -> AntichainRef { self.advance_frontier.borrow() } - fn distinguish_since(&mut self, frontier: AntichainRef) { - self.through_frontier = frontier.to_owned(); + fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } + fn set_physical_compaction(&mut self, frontier: AntichainRef) { + self.physical_frontier = frontier.to_owned(); self.consider_merges(); } - fn distinguish_frontier(&mut self) -> AntichainRef { self.through_frontier.borrow() } + fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } fn map_batches(&mut self, mut f: F) { for batch in self.merging.iter().rev() { @@ -333,7 +333,7 @@ where R: Semigroup, B: Batch, { - /// Drops and logs batches. Used in advance_by and drop. + /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { if let Some(logger) = &self.logger { for batch in self.merging.drain(..) { @@ -430,8 +430,8 @@ where operator, logger, phantom: ::std::marker::PhantomData, - advance_frontier: Antichain::from_elem(::minimum()), - through_frontier: Antichain::from_elem(::minimum()), + logical_frontier: Antichain::from_elem(::minimum()), + physical_frontier: Antichain::from_elem(::minimum()), merging: Vec::new(), pending: Vec::new(), upper: Antichain::from_elem(::minimum()), @@ -449,8 +449,8 @@ where // TODO: Consider merging pending batches before introducing them. // TODO: We could use a `VecDeque` here to draw from the front and append to the back. - while self.pending.len() > 0 && PartialOrder::less_equal(self.pending[0].upper(), &self.through_frontier) - // self.through_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1))) + while self.pending.len() > 0 && PartialOrder::less_equal(self.pending[0].upper(), &self.physical_frontier) + // self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1))) { // Batch can be taken in optimized insertion. // Otherwise it is inserted normally at the end of the method. @@ -662,7 +662,7 @@ where complete: None, } )); - let compaction_frontier = Some(self.advance_frontier.borrow()); + let compaction_frontier = Some(self.logical_frontier.borrow()); self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier); } MergeState::Double(_) => { diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 32e2c3df7..ab09d8f22 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -76,38 +76,78 @@ pub trait TraceReader { /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses. fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)>; - /// Advances the frontier of times the collection must be correctly accumulable through. - /// - /// Practically, this allows the trace to advance times in updates it maintains as long as the advanced times - /// still compare equivalently to any times greater or equal to some element of `frontier`. Times not greater - /// or equal to some element of `frontier` may no longer correctly accumulate, so do not advance a trace unless - /// you are quite sure you no longer require the distinction. - fn advance_by(&mut self, frontier: AntichainRef); + /// Advances the frontier that constrains logical compaction. + /// + /// Logical compaction is the ability of the trace to change the times of the updates it contains. + /// Update times may be changed as long as their comparison to all query times beyond the logical compaction + /// frontier remains unchanged. Practically, this means that groups of timestamps not beyond the frontier can + /// be coalesced into fewer representative times. + /// + /// Logical compaction is important, as it allows the trace to forget historical distinctions between update + /// times, and maintain a compact memory footprint over an unbounded update history. + /// + /// By advancing the logical compaction frontier, the caller unblocks merging of otherwise equivalent udates, + /// but loses the ability to observe historical detail that is not beyond `frontier`. + /// + /// It is an error to call this method with a frontier not beyond the most recent arguments to this method, + /// or the initial value of `get_logical_compaction()` if this method has not yet been called. + fn set_logical_compaction(&mut self, frontier: AntichainRef); + + /// Deprecated form of `set_logical_compaction`. + #[deprecated(since = "0.11", note = "please use `set_logical_compaction`")] + fn advance_by(&mut self, frontier: AntichainRef) { + self.set_logical_compaction(frontier); + } - /// Reports the frontier from which all time comparisions should be accurate. + /// Reports the logical compaction frontier. /// - /// Times that are not greater or equal to some element of the advance frontier may accumulate inaccurately as - /// the trace may have lost the ability to distinguish between such times. Accumulations are only guaranteed to - /// be accurate from the frontier onwards. - fn advance_frontier(&mut self) -> AntichainRef; + /// All update times beyond this frontier will be presented with their original times, and all update times + /// not beyond this frontier will present as a time that compares identically with all query times beyond + /// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as + /// presented, and should be used carefully, only in accumulation to times that are beyond the frontier. + fn get_logical_compaction(&mut self) -> AntichainRef; + + /// Deprecated form of `get_logical_compaction`. + #[deprecated(since = "0.11", note = "please use `get_logical_compaction`")] + fn advance_frontier(&mut self) -> AntichainRef { + self.get_logical_compaction() + } - /// Advances the frontier that may be used in `cursor_through`. - /// - /// Practically, this allows the trace to merge batches whose upper frontier comes before `frontier`. The trace - /// is likely to be annoyed or confused if you use a frontier other than one observed as an upper bound of an - /// actual batch. This doesn't seem likely to be a problem, but get in touch if it is. - /// - /// Calling `distinguish_since(&[])` indicates that all batches may be merged at any point, which essentially - /// disables the use of `cursor_through` with any parameter other than `&[]`, which is the behavior of `cursor`. - fn distinguish_since(&mut self, frontier: AntichainRef); + /// Advances the frontier that constrains physical compaction. + /// + /// Physical compaction is the ability of the trace to merge the batches of updates it maintains. Physical + /// compaction does not change the updates or their timestamps, although it is also the moment at which + /// logical compaction is most likely to happen. + /// + /// Physical compaction allows the trace to maintain a logarithmic number of batches of updates, which is + /// what allows the trace to provide efficient random access by keys and values. + /// + /// By advancing the physical compaction frontier, the caller unblocks the merging of batches of updates, + /// but loses the ability to create a cursor through any frontier not beyond `frontier`. + /// + /// It is an error to call this method with a frontier not beyond the most recent arguments to this method, + /// or the initial value of `get_physical_compaction()` if this method has not yet been called. + fn set_physical_compaction(&mut self, frontier: AntichainRef); + + /// Deprecated form of `set_physical_compaction`. + #[deprecated(since = "0.11", note = "please use `set_physical_compaction`")] + fn distinguish_since(&mut self, frontier: AntichainRef) { + self.set_physical_compaction(frontier); + } - /// Reports the frontier from which the collection may be subsetted. + /// Reports the physical compaction frontier. /// - /// The semantics are less elegant here, but the underlying trace will not merge batches in advance of this - /// frontier, which ensures that operators can extract the subset of the trace at batch boundaries from this - /// frontier onward. These boundaries may be used in `cursor_through`, whereas boundaries not in advance of - /// this frontier are not guaranteed to return a cursor. - fn distinguish_frontier(&mut self) -> AntichainRef; + /// All batches containing updates beyond this frontier will not be merged with ohter batches. This allows + /// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the + /// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any + /// other operators who need to take notice of the physical structure of update batches. + fn get_physical_compaction(&mut self) -> AntichainRef; + + /// Deprecated form of `get_physical_compaction`. + #[deprecated(since = "0.11", note = "please use `get_physical_compaction`")] + fn distinguish_frontier(&mut self) -> AntichainRef { + self.get_physical_compaction() + } /// Maps logic across the non-empty sequence of batches in the trace. /// diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index 1319d9f86..ece97af01 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -56,31 +56,31 @@ where }) } - fn advance_by(&mut self, frontier: AntichainRef) { + fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert(time.clone().to_outer()); } - self.trace.advance_by(self.stash1.borrow()); + self.trace.set_logical_compaction(self.stash1.borrow()); } - fn advance_frontier(&mut self) -> AntichainRef { + fn get_logical_compaction(&mut self) -> AntichainRef { self.stash2.clear(); - for time in self.trace.advance_frontier().iter() { + for time in self.trace.get_logical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); } self.stash2.borrow() } - fn distinguish_since(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert(time.clone().to_outer()); } - self.trace.distinguish_since(self.stash1.borrow()); + self.trace.set_physical_compaction(self.stash1.borrow()); } - fn distinguish_frontier(&mut self) -> AntichainRef { + fn get_physical_compaction(&mut self) -> AntichainRef { self.stash2.clear(); - for time in self.trace.distinguish_frontier().iter() { + for time in self.trace.get_physical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); } self.stash2.borrow() diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index 0919ae3aa..a831ee33f 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -72,31 +72,31 @@ where }) } - fn advance_by(&mut self, frontier: AntichainRef) { + fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert((self.prior)(time)); } - self.trace.advance_by(self.stash1.borrow()); + self.trace.set_logical_compaction(self.stash1.borrow()); } - fn advance_frontier(&mut self) -> AntichainRef { + fn get_logical_compaction(&mut self) -> AntichainRef { self.stash2.clear(); - for time in self.trace.advance_frontier().iter() { + for time in self.trace.get_logical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); } self.stash2.borrow() } - fn distinguish_since(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert((self.prior)(time)); } - self.trace.distinguish_since(self.stash1.borrow()); + self.trace.set_physical_compaction(self.stash1.borrow()); } - fn distinguish_frontier(&mut self) -> AntichainRef { + fn get_physical_compaction(&mut self) -> AntichainRef { self.stash2.clear(); - for time in self.trace.distinguish_frontier().iter() { + for time in self.trace.get_physical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); } self.stash2.borrow() diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index b1445f5ef..9862ef99a 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -49,11 +49,11 @@ where .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone()))) } - fn advance_by(&mut self, frontier: AntichainRef) { self.trace.advance_by(frontier) } - fn advance_frontier(&mut self) -> AntichainRef { self.trace.advance_frontier() } + fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_logical_compaction(frontier) } + fn get_logical_compaction(&mut self) -> AntichainRef { self.trace.get_logical_compaction() } - fn distinguish_since(&mut self, frontier: AntichainRef) { self.trace.distinguish_since(frontier) } - fn distinguish_frontier(&mut self) -> AntichainRef { self.trace.distinguish_frontier() } + fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } + fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y)) diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index 72430d389..1cc6a69bd 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -103,11 +103,11 @@ where }) } - fn advance_by(&mut self, frontier: AntichainRef) { self.trace.advance_by(frontier) } - fn advance_frontier(&mut self) -> AntichainRef { self.trace.advance_frontier() } + fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_logical_compaction(frontier) } + fn get_logical_compaction(&mut self) -> AntichainRef { self.trace.get_logical_compaction() } - fn distinguish_since(&mut self, frontier: AntichainRef) { self.trace.distinguish_since(frontier) } - fn distinguish_frontier(&mut self) -> AntichainRef { self.trace.distinguish_frontier() } + fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } + fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { let func = &self.func; diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index da56a47d9..beb20be46 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -55,11 +55,11 @@ where self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), frontier))) } - fn advance_by(&mut self, frontier: AntichainRef) { self.trace.advance_by(frontier) } - fn advance_frontier(&mut self) -> AntichainRef { self.trace.advance_frontier() } + fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_logical_compaction(frontier) } + fn get_logical_compaction(&mut self) -> AntichainRef { self.trace.get_logical_compaction() } - fn distinguish_since(&mut self, frontier: AntichainRef) { self.trace.distinguish_since(frontier) } - fn distinguish_frontier(&mut self) -> AntichainRef { self.trace.distinguish_frontier() } + fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } + fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)> { let frontier = self.frontier.borrow(); diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index aa641e87c..fd270d71f 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -31,7 +31,7 @@ where Tr: TraceReader { /// accumulated holds on times for advancement. - pub advance_frontiers: MutableAntichain, + pub get_logical_compactions: MutableAntichain, /// accumulated holds on times for distinction. pub through_frontiers: MutableAntichain, /// The wrapped trace. @@ -50,34 +50,34 @@ where pub fn new(mut trace: Tr) -> Self { let mut advance = MutableAntichain::new(); - advance.update_iter(trace.advance_frontier().iter().cloned().map(|t| (t,1))); - // for time in trace.advance_frontier() { + advance.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1))); + // for time in trace.get_logical_compaction() { // advance.update(time, 1); // } let mut through = MutableAntichain::new(); - through.update_iter(trace.distinguish_frontier().iter().cloned().map(|t| (t,1))); - // for time in trace.distinguish_frontier() { + through.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1))); + // for time in trace.get_physical_compaction() { // through.update(time, 1); // } TraceBox { - advance_frontiers: advance, + get_logical_compactions: advance, through_frontiers: through, trace: trace, } } /// Replaces elements of `lower` with those of `upper`. - pub fn adjust_advance_frontier(&mut self, lower: AntichainRef, upper: AntichainRef) { - self.advance_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1))); - self.advance_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1))); - self.trace.advance_by(self.advance_frontiers.frontier()); + pub fn adjust_get_logical_compaction(&mut self, lower: AntichainRef, upper: AntichainRef) { + self.get_logical_compactions.update_iter(upper.iter().cloned().map(|t| (t,1))); + self.get_logical_compactions.update_iter(lower.iter().cloned().map(|t| (t,-1))); + self.trace.set_logical_compaction(self.get_logical_compactions.frontier()); } /// Replaces elements of `lower` with those of `upper`. pub fn adjust_through_frontier(&mut self, lower: AntichainRef, upper: AntichainRef) { self.through_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1))); self.through_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1))); - self.trace.distinguish_since(self.through_frontiers.frontier()); + self.trace.set_physical_compaction(self.through_frontiers.frontier()); } } @@ -91,7 +91,7 @@ where Tr::Time: Lattice+Ord+Clone+'static, Tr: TraceReader, { - advance_frontier: Antichain, + get_logical_compaction: Antichain, through_frontier: Antichain, /// Wrapped trace. Please be gentle when using. pub wrapper: Rc>>, @@ -115,17 +115,17 @@ where /// This change may not have immediately observable effects. It informs the shared trace that this /// handle no longer requires access to times other than those in the future of `frontier`, but if /// there are other handles to the same trace, it may not yet be able to compact. - fn advance_by(&mut self, frontier: AntichainRef) { - self.wrapper.borrow_mut().adjust_advance_frontier(self.advance_frontier.borrow(), frontier); - self.advance_frontier = frontier.to_owned(); + fn set_logical_compaction(&mut self, frontier: AntichainRef) { + self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), frontier); + self.get_logical_compaction = frontier.to_owned(); } - fn advance_frontier(&mut self) -> AntichainRef { self.advance_frontier.borrow() } + fn get_logical_compaction(&mut self) -> AntichainRef { self.get_logical_compaction.borrow() } /// Allows the trace to compact batches of times before `frontier`. - fn distinguish_since(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), frontier); self.through_frontier = frontier.to_owned(); } - fn distinguish_frontier(&mut self) -> AntichainRef { self.through_frontier.borrow() } + fn get_physical_compaction(&mut self) -> AntichainRef { self.through_frontier.borrow() } /// Creates a new cursor over the wrapped trace. fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, >::Storage)> { ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier) @@ -147,7 +147,7 @@ where let wrapped = Rc::new(RefCell::new(TraceBox::new(trace))); let handle = TraceRc { - advance_frontier: wrapped.borrow().advance_frontiers.frontier().to_owned(), + get_logical_compaction: wrapped.borrow().get_logical_compactions.frontier().to_owned(), through_frontier: wrapped.borrow().through_frontiers.frontier().to_owned(), wrapper: wrapped.clone(), }; @@ -163,10 +163,10 @@ where { fn clone(&self) -> Self { // increase ref counts for this frontier - self.wrapper.borrow_mut().adjust_advance_frontier(Antichain::new().borrow(), self.advance_frontier.borrow()); + self.wrapper.borrow_mut().adjust_get_logical_compaction(Antichain::new().borrow(), self.get_logical_compaction.borrow()); self.wrapper.borrow_mut().adjust_through_frontier(Antichain::new().borrow(), self.through_frontier.borrow()); TraceRc { - advance_frontier: self.advance_frontier.clone(), + get_logical_compaction: self.get_logical_compaction.clone(), through_frontier: self.through_frontier.clone(), wrapper: self.wrapper.clone(), } @@ -179,9 +179,9 @@ where Tr: TraceReader, { fn drop(&mut self) { - self.wrapper.borrow_mut().adjust_advance_frontier(self.advance_frontier.borrow(), Antichain::new().borrow()); + self.wrapper.borrow_mut().adjust_get_logical_compaction(self.get_logical_compaction.borrow(), Antichain::new().borrow()); self.wrapper.borrow_mut().adjust_through_frontier(self.through_frontier.borrow(), Antichain::new().borrow()); - self.advance_frontier = Antichain::new(); + self.get_logical_compaction = Antichain::new(); self.through_frontier = Antichain::new(); } } \ No newline at end of file diff --git a/tests/import.rs b/tests/import.rs index 5d36f2925..fb2fd6136 100644 --- a/tests/import.rs +++ b/tests/import.rs @@ -222,7 +222,7 @@ fn import_skewed() { input.send(((index as u64, 1), index, 1)); input.close(); - trace.advance_by(AntichainRef::new(&[peers - index])); + trace.set_logical_compaction(AntichainRef::new(&[peers - index])); let (captured,) = worker.dataflow(move |scope| { let imported = trace.import(scope); diff --git a/tests/trace.rs b/tests/trace.rs index 3aa6a4c64..6bb32990e 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -65,25 +65,3 @@ fn test_trace() { let vec_4 = cursor4.to_vec(&storage4); assert_eq!(vec_4, vec_3); } - -// #[test] -// fn test_advance() { -// let mut trace = get_trace(); - -// trace.advance_by(&[2]); -// trace.distinguish_since(&[2]); - -// let (mut cursor1, storage1) = trace.cursor_through(&[2]).unwrap(); - -// assert_eq!( -// cursor1.to_vec(&storage1), -// vec![((1.into(), 2), vec![(2, 1)]), ((2.into(), 3), vec![(2, 1)])]); - -// trace.distinguish_since(&[3]); - -// let (mut cursor2, storage2) = trace.cursor_through(&[3]).unwrap(); - -// assert_eq!( -// cursor2.to_vec(&storage2), -// vec![((1.into(), 2), vec![(2, 1)]), ((2.into(), 3), vec![(2, 1), (2, -1)])]); -// } diff --git a/tpchlike/src/bin/arrange.rs b/tpchlike/src/bin/arrange.rs index d38abc532..503fb5fdb 100644 --- a/tpchlike/src/bin/arrange.rs +++ b/tpchlike/src/bin/arrange.rs @@ -166,7 +166,7 @@ fn main() { let time = next_round; - traces.advance_by(&[next_round]); + traces.set_logical_compaction(&[next_round]); worker.step_while(|| probe.less_than(&time)); round += 1; diff --git a/tpchlike/src/bin/just-arrange.rs b/tpchlike/src/bin/just-arrange.rs index 5752a81de..ca7bf52fd 100644 --- a/tpchlike/src/bin/just-arrange.rs +++ b/tpchlike/src/bin/just-arrange.rs @@ -143,7 +143,7 @@ fn main() { let time = next_round; - traces.advance_by(&[next_round]); + traces.set_logical_compaction(&[next_round]); worker.step_while(|| probe.less_than(&time)); round += 1; diff --git a/tpchlike/src/bin/sosp.rs b/tpchlike/src/bin/sosp.rs index 44694b572..eaa4e949d 100644 --- a/tpchlike/src/bin/sosp.rs +++ b/tpchlike/src/bin/sosp.rs @@ -139,7 +139,7 @@ fn main() { if let Some(mut data) = suppliers.pop() { inputs.supplier.send_batch(&mut data); } inputs.advance_to(next_round); - traces.advance_by(&[next_round]); + traces.set_logical_compaction(&[next_round]); let start = timer.elapsed(); worker.step_while(|| probe.less_than(&next_round)); diff --git a/tpchlike/src/lib.rs b/tpchlike/src/lib.rs index 27b5c0e3e..a798e4236 100644 --- a/tpchlike/src/lib.rs +++ b/tpchlike/src/lib.rs @@ -157,37 +157,37 @@ impl Arrangements { let empty_frontier = empty_frontier.borrow(); let mut arranged = scope.input_from(&mut inputs.customer).as_collection().map(|x| (x.cust_key, x)).arrange_by_key(); arranged.stream.probe_with(probe); - arranged.trace.distinguish_since(empty_frontier); + arranged.trace.set_physical_compaction(empty_frontier); let customer = arranged.trace; let mut arranged = scope.input_from(&mut inputs.nation).as_collection().map(|x| (x.nation_key, x)).arrange_by_key(); arranged.stream.probe_with(probe); - arranged.trace.distinguish_since(empty_frontier); + arranged.trace.set_physical_compaction(empty_frontier); let nation = arranged.trace; let mut arranged = scope.input_from(&mut inputs.order).as_collection().map(|x| (x.order_key, x)).arrange_by_key(); arranged.stream.probe_with(probe); - arranged.trace.distinguish_since(empty_frontier); + arranged.trace.set_physical_compaction(empty_frontier); let order = arranged.trace; let mut arranged = scope.input_from(&mut inputs.part).as_collection().map(|x| (x.part_key, x)).arrange_by_key(); arranged.stream.probe_with(probe); - arranged.trace.distinguish_since(empty_frontier); + arranged.trace.set_physical_compaction(empty_frontier); let part = arranged.trace; let mut arranged = scope.input_from(&mut inputs.partsupp).as_collection().map(|x| ((x.part_key, x.supp_key), x)).arrange_by_key(); arranged.stream.probe_with(probe); - arranged.trace.distinguish_since(empty_frontier); + arranged.trace.set_physical_compaction(empty_frontier); let partsupp = arranged.trace; let mut arranged = scope.input_from(&mut inputs.region).as_collection().map(|x| (x.region_key, x)).arrange_by_key(); arranged.stream.probe_with(probe); - arranged.trace.distinguish_since(empty_frontier); + arranged.trace.set_physical_compaction(empty_frontier); let region = arranged.trace; let mut arranged = scope.input_from(&mut inputs.supplier).as_collection().map(|x| (x.supp_key, x)).arrange_by_key(); arranged.stream.probe_with(probe); - arranged.trace.distinguish_since(empty_frontier); + arranged.trace.set_physical_compaction(empty_frontier); let supplier = arranged.trace; Arrangements { @@ -236,18 +236,18 @@ impl Arrangements { } } - pub fn advance_by(&mut self, frontier: &[usize]) { + pub fn set_logical_compaction(&mut self, frontier: &[usize]) { use differential_dataflow::trace::TraceReader; use timely::progress::frontier::AntichainRef; let frontier = AntichainRef::new(frontier); - self.customer.advance_by(frontier); - self.nation.advance_by(frontier); - self.order.advance_by(frontier); - self.part.advance_by(frontier); - self.partsupp.advance_by(frontier); - self.region.advance_by(frontier); - self.supplier.advance_by(frontier); + self.customer.set_logical_compaction(frontier); + self.nation.set_logical_compaction(frontier); + self.order.set_logical_compaction(frontier); + self.part.set_logical_compaction(frontier); + self.partsupp.set_logical_compaction(frontier); + self.region.set_logical_compaction(frontier); + self.supplier.set_logical_compaction(frontier); } }