Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
S: FnMut(&D, &R, &Tr::Val, &Tr::R)->(DOut, ROut)+'static,
{
// No need to block physical merging for this operator.
arrangement.trace.distinguish_since(Antichain::new().borrow());
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
let mut propose_trace = Some(arrangement.trace);
let propose_stream = arrangement.stream;

Expand Down Expand Up @@ -138,7 +138,7 @@ where
for key in stash.keys() {
frontier.insert(key.time().clone());
}
propose_trace.as_mut().map(|trace| trace.advance_by(frontier.borrow()));
propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));

if input1.frontier().is_empty() && stash.is_empty() {
propose_trace = None;
Expand Down
8 changes: 4 additions & 4 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ fn main() {
graph.close();
for i in 1..rounds + 1 {
/* Advance the trace frontier to enable trace compaction. */
graph_trace.distinguish_since(AntichainRef::new(&[i]));
graph_trace.advance_by(AntichainRef::new(&[i]));
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(&i));
dump_cursor(i, worker.index(), &mut graph_trace);
}
Expand All @@ -93,8 +93,8 @@ fn main() {
}
graph.advance_to(i);
graph.flush();
graph_trace.distinguish_since(AntichainRef::new(&[i]));
graph_trace.advance_by(AntichainRef::new(&[i]));
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(graph.time()));
dump_cursor(i, worker.index(), &mut graph_trace);
}
Expand Down
14 changes: 7 additions & 7 deletions examples/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use pair::Pair;

fn main() {

timely::execute_from_args(std::env::args(), move |worker| {
timely::execute_from_args(std::env::args(), move |worker| {

// Used to determine if our output has caught up to our input.
let mut probe: ProbeHandle<Pair<isize, isize>> = ProbeHandle::new();
Expand All @@ -49,7 +49,7 @@ fn main() {
});

// Do not hold back physical compaction.
trace.distinguish_since(AntichainRef::new(&[]));
trace.set_physical_compaction(AntichainRef::new(&[]));

println!("Multi-temporal histogram; valid commands are (integer arguments):");
println!(" update value time1 time2 change");
Expand Down Expand Up @@ -86,22 +86,22 @@ fn main() {
},
("advance-output", 2) => {
let time = Pair::new(arguments[0], arguments[1]);
if trace.advance_frontier().less_equal(&time) {
trace.advance_by(AntichainRef::new(&[time]));
if trace.get_logical_compaction().less_equal(&time) {
trace.set_logical_compaction(AntichainRef::new(&[time]));
while probe.less_than(capability.time()) {
worker.step();
}
} else {
println!("Requested time {:?} not readable (output from {:?})", time, trace.advance_frontier());
println!("Requested time {:?} not readable (output from {:?})", time, trace.get_logical_compaction());
}
},
("query", 2) => {
// Check that the query times are not beyond the current capabilities.
let query_time = Pair::new(arguments[0], arguments[1]);
if capability.time().less_equal(&query_time) {
println!("Query time ({:?}) is still open (input from {:?}).", query_time, capability.time());
} else if !trace.advance_frontier().less_equal(&query_time) {
println!("Query time ({:?}) no longer available in output (output from {:?}).", query_time, trace.advance_frontier());
} else if !trace.get_logical_compaction().less_equal(&query_time) {
println!("Query time ({:?}) no longer available in output (output from {:?}).", query_time, trace.get_logical_compaction());
}
else {
println!("Report at {:?}", query_time);
Expand Down
8 changes: 4 additions & 4 deletions interactive/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,13 @@ impl<V: ExchangeData+Hash+Datum> TraceManager<V> {
use timely::progress::frontier::Antichain;
let frontier = Antichain::from_elem(time.clone());
for trace in self.inputs.values_mut() {
trace.advance_by(frontier.borrow());
trace.distinguish_since(frontier.borrow());
trace.set_logical_compaction(frontier.borrow());
trace.set_physical_compaction(frontier.borrow());
}
for map in self.arrangements.values_mut() {
for trace in map.values_mut() {
trace.advance_by(frontier.borrow());
trace.distinguish_since(frontier.borrow());
trace.set_logical_compaction(frontier.borrow());
trace.set_physical_compaction(frontier.borrow());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions mdbook/src/chapter_5/chapter_5_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ When we extract a trace from an arrangement, we acquire the ability to replay th

A `TraceHandle` (the type of `trace`) has two important methods. Their names are not great, and subject to change in the future. Their idioms may also change as more information flows in about users and use cases.

1. `advance_by(frontier)`. This method informs `trace` that it will no longer be called upon to handle queries for times not in advance of `frontier`, a set of timestamps. This gives the arrangement permission to coalesce otherwise indistinguishable timestamps, which it will start to do once all handles have advanced.
1. `set_logical_compaction(frontier)`. This method informs `trace` that it will no longer be called upon to handle queries for times not in advance of `frontier`, a set of timestamps. This gives the arrangement permission to coalesce otherwise indistinguishable timestamps, which it will start to do once all handles have advanced.

2. `distinguish_since(frontier)`. This method unblocks the merging of physical batches. It is very rare that a user wants to do anything with this other than call `trace.distinguish_since(&[])`, which unblocks all merging. Certain operators, namely `join`, do need to carefully manipulate this method.
2. `set_physical_compaction(frontier)`. This method unblocks the merging of physical batches. It is very rare that a user wants to do anything with this other than call `trace.set_physical_compaction(&[])`, which unblocks all merging. Certain operators, namely `join`, do need to carefully manipulate this method.
6 changes: 3 additions & 3 deletions server/dataflows/random_graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
// operator holds only a weak reference to it.
//
// The operator also holds an `Weak<RefCell<Option<TraceHandle>>>` which it will
// attempt to borrow and call `advance_by` in order to advance the capability
// attempt to borrow and call `set_logical_compaction` in order to advance the capability
// as it runs, to allow compaction and the maintenance of bounded state.

if args.len() != 4 { return Err(format!("expected four arguments, instead: {:?}", args)); }
Expand Down Expand Up @@ -134,7 +134,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
if let Some(trace_handle) = trace_handle_weak.upgrade() {
let mut borrow = trace_handle.borrow_mut();
if let Some(ref mut trace_handle) = borrow.as_mut() {
trace_handle.advance_by(&[elapsed_ns]);
trace_handle.set_logical_compaction(&[elapsed_ns]);
}
}

Expand Down Expand Up @@ -191,7 +191,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
.trace;

// release all blocks on merging.
trace.distinguish_since(&[]);
trace.set_physical_compaction(&[]);
*trace_handle.borrow_mut() = Some(trace);

handles.set::<Rc<RefCell<Option<TraceHandle>>>>(name.to_owned(), trace_handle);
Expand Down
26 changes: 13 additions & 13 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ where
type Batch = Tr::Batch;
type Cursor = Tr::Cursor;

fn advance_by(&mut self, frontier: AntichainRef<Tr::Time>) {
self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), frontier);
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), frontier);
self.advance.clear();
self.advance.extend(frontier.iter().cloned());
}
fn advance_frontier(&mut self) -> AntichainRef<Tr::Time> {
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.advance.borrow()
}
fn distinguish_since(&mut self, frontier: AntichainRef<Tr::Time>) {
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier);
self.through.clear();
self.through.extend(frontier.iter().cloned());
}
fn distinguish_frontier(&mut self) -> AntichainRef<Tr::Time> {
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.through.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
Expand Down Expand Up @@ -99,7 +99,7 @@ where
let reader = TraceAgent {
trace: trace.clone(),
queues: Rc::downgrade(&queues),
advance: trace.borrow().advance_frontiers.frontier().to_owned(),
advance: trace.borrow().get_logical_compactions.frontier().to_owned(),
through: trace.borrow().through_frontiers.frontier().to_owned(),
operator,
logging,
Expand Down Expand Up @@ -163,13 +163,13 @@ where
/// are no longer evident.
///
/// The current behavior is that the introduced collection accumulates updates to some times less or equal
/// to `self.advance_frontier()`. There is *not* currently a guarantee that the updates are accumulated *to*
/// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
/// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
/// the historical collection may move through configurations that did not actually occur, even if eventually
/// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
/// the intermediate computation could do something that the original computation did not, like diverge.
///
/// I would expect the semantics to improve to "updates are advanced to `self.advance_frontier()`", which
/// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
/// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
/// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
/// to advance times before using them).
Expand Down Expand Up @@ -341,7 +341,7 @@ where

/// Imports an arrangement into the supplied scope.
///
/// This variant of import uses the `advance_frontier` to forcibly advance timestamps in updates.
/// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
///
/// # Examples
///
Expand Down Expand Up @@ -382,7 +382,7 @@ where
/// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
/// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
///
/// trace.advance_by(AntichainRef::new(&[5]));
/// trace.set_logical_compaction(AntichainRef::new(&[5]));
///
/// // create a second dataflow
/// let mut shutdown = worker.dataflow(|scope| {
Expand Down Expand Up @@ -418,7 +418,7 @@ where
Tr: TraceReader,
{
// This frontier describes our only guarantee on the compaction frontier.
let frontier = self.advance_frontier().to_owned();
let frontier = self.get_logical_compaction().to_owned();
self.import_frontier_core(scope, name, frontier)
}

Expand Down Expand Up @@ -533,7 +533,7 @@ where

// increase counts for wrapped `TraceBox`.
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_advance_frontier(empty_frontier.borrow(), self.advance.borrow());
self.trace.borrow_mut().adjust_get_logical_compaction(empty_frontier.borrow(), self.advance.borrow());
self.trace.borrow_mut().adjust_through_frontier(empty_frontier.borrow(), self.through.borrow());

TraceAgent {
Expand Down Expand Up @@ -562,7 +562,7 @@ where

// decrement borrow counts to remove all holds
let empty_frontier = Antichain::new();
self.trace.borrow_mut().adjust_advance_frontier(self.advance.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_get_logical_compaction(self.advance.borrow(), empty_frontier.borrow());
self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), empty_frontier.borrow());
}
}
6 changes: 3 additions & 3 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ where
queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| {

let mut trace = Some(self.trace.clone());
// release `distinguish_since` capability.
trace.as_mut().unwrap().distinguish_since(Antichain::new().borrow());
// release `set_physical_compaction` capability.
trace.as_mut().unwrap().set_physical_compaction(Antichain::new().borrow());

let mut stash = Vec::new();
let mut capability: Option<Capability<G::Timestamp>> = None;
Expand Down Expand Up @@ -413,7 +413,7 @@ where
].into_iter().cloned().filter_map(|t| t).min();

if let Some(frontier) = frontier {
trace.as_mut().map(|t| t.advance_by(AntichainRef::new(&[frontier])));
trace.as_mut().map(|t| t.set_logical_compaction(AntichainRef::new(&[frontier])));
}
else {
trace = None;
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ where
input_frontier.extend(input.frontier().frontier().iter().cloned());

// Downgrade capabilities for `reader_local`.
reader_local.advance_by(input_frontier.borrow());
reader_local.distinguish_since(input_frontier.borrow());
reader_local.set_logical_compaction(input_frontier.borrow());
reader_local.set_physical_compaction(input_frontier.borrow());
}

if let Some(mut fuel) = effort.clone() {
Expand Down
4 changes: 2 additions & 2 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ where

// tidy up the shared input trace.
trace.advance_upper(&mut upper_limit);
trace.advance_by(upper_limit.borrow());
trace.distinguish_since(upper_limit.borrow());
trace.set_logical_compaction(upper_limit.borrow());
trace.set_physical_compaction(upper_limit.borrow());
}
})
.as_collection()
Expand Down
12 changes: 6 additions & 6 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
if !batch1.is_empty() {
if let Some(acknowledged2) = &acknowledged2 {
// TODO : cursor_through may be problematic for pre-merged traces.
// A trace should provide the contract that whatever its `distinguish_since` capability,
// A trace should provide the contract that whatever its `set_physical_compaction` capability,
// it is safe (and reasonable) to await delivery of batches up through that frontier.
// In this case, we should be able to await (not block on) the arrival of these batches.
let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
Expand Down Expand Up @@ -395,7 +395,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
if !batch2.is_empty() {
if let Some(acknowledged1) = &acknowledged1 {
// TODO : cursor_through may be problematic for pre-merged traces.
// A trace should provide the contract that whatever its `distinguish_since` capability,
// A trace should provide the contract that whatever its `set_physical_compaction` capability,
// it is safe (and reasonable) to await delivery of batches up through that frontier.
// In this case, we should be able to await (not block on) the arrival of these batches.
let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
Expand Down Expand Up @@ -450,28 +450,28 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
// shut down or advance trace2.
if trace2.is_some() && input1.frontier().is_empty() { trace2 = None; }
if let Some(ref mut trace2) = trace2 {
trace2.advance_by(input1.frontier().frontier());
trace2.set_logical_compaction(input1.frontier().frontier());
// At this point, if we haven't seen any input batches we should establish a frontier anyhow.
if acknowledged2.is_none() {
acknowledged2 = Some(Antichain::from_elem(<G::Timestamp>::minimum()));
}
if let Some(acknowledged2) = &mut acknowledged2 {
trace2.advance_upper(acknowledged2);
trace2.distinguish_since(acknowledged2.borrow());
trace2.set_physical_compaction(acknowledged2.borrow());
}
}

// shut down or advance trace1.
if trace1.is_some() && input2.frontier().is_empty() { trace1 = None; }
if let Some(ref mut trace1) = trace1 {
trace1.advance_by(input2.frontier().frontier());
trace1.set_logical_compaction(input2.frontier().frontier());
// At this point, if we haven't seen any input batches we should establish a frontier anyhow.
if acknowledged1.is_none() {
acknowledged1 = Some(Antichain::from_elem(<G::Timestamp>::minimum()));
}
if let Some(acknowledged1) = &mut acknowledged1 {
trace1.advance_upper(acknowledged1);
trace1.distinguish_since(acknowledged1.borrow());
trace1.set_physical_compaction(acknowledged1.borrow());
}
}
}
Expand Down
Loading