diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 04e168cb5..c27a04e45 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -63,6 +63,7 @@ where self.advance.borrow() } fn set_physical_compaction(&mut self, frontier: AntichainRef) { + debug_assert!(timely::PartialOrder::less_equal(&self.through.borrow(), &frontier)); self.trace.borrow_mut().adjust_through_frontier(self.through.borrow(), frontier); self.through.clear(); self.through.extend(frontier.iter().cloned()); diff --git a/src/operators/join.rs b/src/operators/join.rs index 5feb7e3b8..939d867f9 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -320,106 +320,156 @@ impl JoinCore for Arranged I::Item: Data, L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static { - // handles to shared trace data structures. - let mut trace1 = Some(self.trace.clone()); - let mut trace2 = Some(other.trace.clone()); + // Rename traces for symmetry from here on out. + let mut trace1 = self.trace.clone(); + let mut trace2 = other.trace.clone(); - // acknowledged frontier for each input. - use timely::progress::frontier::Antichain; - let mut acknowledged1: Option> = None; - let mut acknowledged2: Option> = None; - - // deferred work of batches from each input. - let mut todo1 = std::collections::VecDeque::new(); - let mut todo2 = std::collections::VecDeque::new(); - - let mut input1_buffer = Vec::new(); - let mut input2_buffer = Vec::new(); - - self.stream.binary_frontier(&other.stream, Pipeline, Pipeline, "Join", move |_cap, info| { + self.stream.binary_frontier(&other.stream, Pipeline, Pipeline, "Join", move |capability, info| { + // Acquire an activator to reschedule the operator when it has unfinished work. use timely::scheduling::Activator; let activations = self.stream.scope().activations().clone(); let activator = Activator::new(&info.address[..], activations); + // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound. + // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as + // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the + // initial work for the two traces, and before the operator is constructed. + + // Acknowledged frontier for each input. + // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`. + // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond + // the physical compaction frontier of their corresponding trace. + // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used. + use timely::progress::frontier::Antichain; + let mut acknowledged1 = Antichain::from_elem(::minimum()); + let mut acknowledged2 = Antichain::from_elem(::minimum()); + + // deferred work of batches from each input. + let mut todo1 = std::collections::VecDeque::new(); + let mut todo2 = std::collections::VecDeque::new(); + + // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start. + trace1.map_batches(|batch1| { + acknowledged1.clone_from(batch1.upper()); + // No `todo1` work here, because we haven't accepted anything into `batches2` yet. + // It is effectively "empty", because we choose to drain `trace1` before `trace2`. + // Once we start streaming batches in, we will need to respond to new batches from + // `input1` with logic that would have otherwise been here. Check out the next loop + // for the structure. + }); + // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by + // iterating through batches and capturing the upper bound. This is a great moment to assert that + // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`. + // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. + assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow())); + + trace2.map_batches(|batch2| { + acknowledged2.clone_from(batch2.upper()); + // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`. + let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); + let batch2_cursor = batch2.cursor(); + // We could downgrade the capability here, but doing so is a bit complicated mathematically. + // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not + // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have + // that property. + todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone())); + }); + // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by + // iterating through batches and capturing the upper bound. This is a great moment to assert that + // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`. + // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. + assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow())); + + // Droppable handles to shared trace data structures. + let mut trace1_option = Some(trace1); + let mut trace2_option = Some(trace2); + + // Swappable buffers for input extraction. + let mut input1_buffer = Vec::new(); + let mut input2_buffer = Vec::new(); + move |input1, input2, output| { + // 1. Consuming input. + // // The join computation repeatedly accepts batches of updates from each of its inputs. // // For each accepted batch, it prepares a work-item to join the batch against previously "accepted" - // updates from its other input. It is important to track which updates have been accepted, through - // a combination of the input's frontier and the most recently received batch's upper bound, because + // updates from its other input. It is important to track which updates have been accepted, because // we use a shared trace and there may be updates present that are in advance of this accepted bound. + // + // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream, + // and 3. if the trace can confirm a region of empty space directly following our accepted bound. + // This last case is a consequence of our inability to transmit empty batches, as they may be formed + // in the absence of timely dataflow capabilities. - // drain input 1, prepare work. + // Drain input 1, prepare work. input1.for_each(|capability, data| { - if let Some(ref mut trace2) = trace2 { + // This test *should* always pass, as we only drop a trace in response to the other input emptying. + if let Some(ref mut trace2) = trace2_option { let capability = capability.retain(); data.swap(&mut input1_buffer); for batch1 in input1_buffer.drain(..) { - if !batch1.is_empty() { - if let Some(acknowledged2) = &acknowledged2 { - // TODO : cursor_through may be problematic for pre-merged traces. - // A trace should provide the contract that whatever its `set_physical_compaction` capability, - // it is safe (and reasonable) to await delivery of batches up through that frontier. - // In this case, we should be able to await (not block on) the arrival of these batches. + // Ignore any pre-loaded data. + if PartialOrder::less_equal(&acknowledged1, &batch1.lower()) { + if !batch1.is_empty() { + // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()` + // at start-up, and have held back physical compaction ever since. let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap(); let batch1_cursor = batch1.cursor(); - todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone(), |r2,r1| (r1.clone()) * (r2.clone()))); + todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone())); } - } - // It would be alarming (incorrect) to receieve a non-empty batch that does not advance the - // acknowledged frontier, as each batch must be greater than previous batches, and the input. - // Empty batches may be received as information races between frontier forwarding of the trace - // and the empty batches themselves (which can be sent as part of trace importing). - if acknowledged1.is_none() { acknowledged1 = Some(timely::progress::frontier::Antichain::from_elem(::minimum())); } - if let Some(acknowledged1) = &mut acknowledged1 { - if !PartialOrder::less_equal(&*acknowledged1, batch1.upper()) { - if !batch1.is_empty() { - panic!("Non-empty batch1 upper not beyond acknowledged frontier: {:?}, {:?}", batch1.upper(), acknowledged1); - } - } + // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we + // may have skipped over empty batches. Still, the batches are in-order, and we should be + // able to just assume the most recent `batch1.upper` + debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper())); acknowledged1.clone_from(batch1.upper()); } } } + else { panic!("`trace2_option` dropped before `input1` emptied!"); } }); - // drain input 2, prepare work. + // Drain input 2, prepare work. input2.for_each(|capability, data| { - if let Some(ref mut trace1) = trace1 { + // This test *should* always pass, as we only drop a trace in response to the other input emptying. + if let Some(ref mut trace1) = trace1_option { let capability = capability.retain(); data.swap(&mut input2_buffer); for batch2 in input2_buffer.drain(..) { - if !batch2.is_empty() { - if let Some(acknowledged1) = &acknowledged1 { - // TODO : cursor_through may be problematic for pre-merged traces. - // A trace should provide the contract that whatever its `set_physical_compaction` capability, - // it is safe (and reasonable) to await delivery of batches up through that frontier. - // In this case, we should be able to await (not block on) the arrival of these batches. + // Ignore any pre-loaded data. + if PartialOrder::less_equal(&acknowledged2, &batch2.lower()) { + if !batch2.is_empty() { + // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()` + // at start-up, and have held back physical compaction ever since. let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); let batch2_cursor = batch2.cursor(); - todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone(), |r1,r2| (r1.clone()) * (r2.clone()))); - } - } - // It would be alarming (incorrect) to receieve a non-empty batch that does not advance the - // acknowledged frontier, as each batch must be greater than previous batches, and the input. - // Empty batches may be received as information races between frontier forwarding of the trace - // and the empty batches themselves (which can be sent as part of trace importing). - if acknowledged2.is_none() { acknowledged2 = Some(timely::progress::frontier::Antichain::from_elem(::minimum())); } - if let Some(acknowledged2) = &mut acknowledged2 { - if !PartialOrder::less_equal(&*acknowledged2, batch2.upper()) { - if !batch2.is_empty() { - panic!("Non-empty batch2 upper not beyond acknowledged frontier: {:?}, {:?}", batch2.upper(), acknowledged2); - } + todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone())); } + + // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we + // may have skipped over empty batches. Still, the batches are in-order, and we should be + // able to just assume the most recent `batch2.upper` + debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper())); acknowledged2.clone_from(batch2.upper()); } } } + else { panic!("`trace1_option` dropped before `input2` emptied!"); } }); + // Advance acknowledged frontiers through any empty regions that we may not receive as batches. + if let Some(trace1) = trace1_option.as_mut() { + trace1.advance_upper(&mut acknowledged1); + } + if let Some(trace2) = trace2_option.as_mut() { + trace2.advance_upper(&mut acknowledged2); + } + + // 2. Join computation. + // // For each of the inputs, we do some amount of work (measured in terms of number // of output records produced). This is meant to yield control to allow downstream // operators to consume and reduce the output, but it it also means to provide some @@ -428,17 +478,27 @@ impl JoinCore for Arranged // which results in unintentionally quadratic processing time (each batch of either // input must scan all batches from the other input). - // perform some amount of outstanding work. + // Perform some amount of outstanding work. let mut fuel = 1_000_000; while !todo1.is_empty() && fuel > 0 { - todo1.front_mut().unwrap().work(output, &mut |k,v2,v1| result(k,v1,v2), &mut fuel); + todo1.front_mut().unwrap().work( + output, + |k,v2,v1| result(k,v1,v2), + |r2,r1| (r1.clone()) * (r2.clone()), + &mut fuel + ); if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } } - // perform some amount of outstanding work. + // Perform some amount of outstanding work. let mut fuel = 1_000_000; while !todo2.is_empty() && fuel > 0 { - todo2.front_mut().unwrap().work(output, &mut |k,v1,v2| result(k,v1,v2), &mut fuel); + todo2.front_mut().unwrap().work( + output, + |k,v1,v2| result(k,v1,v2), + |r1,r2| (r1.clone()) * (r2.clone()), + &mut fuel + ); if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } } @@ -447,31 +507,42 @@ impl JoinCore for Arranged activator.activate(); } - // shut down or advance trace2. - if trace2.is_some() && input1.frontier().is_empty() { trace2 = None; } - if let Some(ref mut trace2) = trace2 { - trace2.set_logical_compaction(input1.frontier().frontier()); - // At this point, if we haven't seen any input batches we should establish a frontier anyhow. - if acknowledged2.is_none() { - acknowledged2 = Some(Antichain::from_elem(::minimum())); - } - if let Some(acknowledged2) = &mut acknowledged2 { - trace2.advance_upper(acknowledged2); - trace2.set_physical_compaction(acknowledged2.borrow()); + // 3. Trace maintenance. + // + // Importantly, we use `input.frontier()` here rather than `acknowledged` to track + // the progress of an input, because should we ever drop one of the traces we will + // lose the ability to extract information from anything other than the input. + // For example, if we dropped `trace2` we would not be able to use `advance_upper` + // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical + // compaction of `trace1`. + + // Maintain `trace1`. Drop if `input2` is empty. or advance based on future needs. + if let Some(trace1) = trace1_option.as_mut() { + if input2.frontier().is_empty() { trace1_option = None; } + else { + // Allow `trace1` to logically compact up to the frontier we may yet receive, + // in the opposing input (`input2`). All `input2` times will be beyond this + // frontier, and joined times only need to be accurate when advanced to it. + trace1.set_logical_compaction(input2.frontier().frontier()); + // Allow `trace1` to physically compact up to the upper bound of batches we + // have received in its input (`input1`). We will not require a cursor that + // is not beyond this bound. + trace1.set_physical_compaction(acknowledged1.borrow()); } } - // shut down or advance trace1. - if trace1.is_some() && input2.frontier().is_empty() { trace1 = None; } - if let Some(ref mut trace1) = trace1 { - trace1.set_logical_compaction(input2.frontier().frontier()); - // At this point, if we haven't seen any input batches we should establish a frontier anyhow. - if acknowledged1.is_none() { - acknowledged1 = Some(Antichain::from_elem(::minimum())); - } - if let Some(acknowledged1) = &mut acknowledged1 { - trace1.advance_upper(acknowledged1); - trace1.set_physical_compaction(acknowledged1.borrow()); + // Maintain `trace2`. Drop if `input1` is empty. or advance based on future needs. + if let Some(trace2) = trace2_option.as_mut() { + if input1.frontier().is_empty() { trace2_option = None;} + else { + // Allow `trace2` to logically compact up to the frontier we may yet receive, + // in the opposing input (`input1`). All `input1` times will be beyond this + // frontier, and joined times only need to be accurate when advanced to it. + trace2.set_logical_compaction(input1.frontier().frontier()); + // Allow `trace2` to physically compact up to the upper bound of batches we + // have received in its input (`input2`). We will not require a cursor that + // is not beyond this bound. + trace2.set_physical_compaction(acknowledged2.borrow()); } } } @@ -485,16 +556,16 @@ impl JoinCore for Arranged /// The structure wraps cursors which allow us to play out join computation at whatever rate we like. /// This allows us to avoid producing and buffering massive amounts of data, without giving the timely /// dataflow system a chance to run operators that can consume and aggregate the data. -struct Deferred +struct Deferred where V1: Ord+Clone, V2: Ord+Clone, T: Timestamp+Lattice+Ord+Debug, R1: Semigroup, R2: Semigroup, + R3: Semigroup, C1: Cursor, C2: Cursor, - M: FnMut(&R1,&R2)->R3, D: Ord+Clone+Data, { phant: ::std::marker::PhantomData<(K, V1, V2, R1, R2)>, @@ -503,13 +574,11 @@ where batch: C2, batch_storage: C2::Storage, capability: Capability, - mult: M, done: bool, temp: Vec<((D, T), R3)>, - // thinker: JoinThinker, } -impl Deferred +impl Deferred where K: Ord+Debug+Eq, V1: Ord+Clone+Debug, @@ -520,10 +589,9 @@ where R3: Semigroup, C1: Cursor, C2: Cursor, - M: FnMut(&R1,&R2)->R3, D: Ord+Clone+Data, { - fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability, mult: M) -> Self { + fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability) -> Self { Deferred { phant: ::std::marker::PhantomData, trace, @@ -531,10 +599,8 @@ where batch, batch_storage, capability, - mult, done: false, temp: Vec::new(), - // thinker: JoinThinker::new(), } } @@ -544,8 +610,8 @@ where /// Process keys until at least `limit` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>, logic: &mut L, fuel: &mut usize) - where I: IntoIterator, L: FnMut(&K, &V1, &V2)->I { + fn work(&mut self, output: &mut OutputHandle>, mut logic: L, mut mult: M, fuel: &mut usize) + where I: IntoIterator, L: FnMut(&K, &V1, &V2)->I, M: FnMut(&R1,&R2)->R3 { let meet = self.capability.time(); @@ -557,10 +623,8 @@ where let trace = &mut self.trace; let batch = &mut self.batch; - let mult = &mut self.mult; let temp = &mut self.temp; - // let thinker = &mut self.thinker; let mut thinker = JoinThinker::new(); while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel { diff --git a/src/trace/implementations/spine_fueled_neu.rs b/src/trace/implementations/spine_fueled_neu.rs index 57553d825..ed111638e 100644 --- a/src/trace/implementations/spine_fueled_neu.rs +++ b/src/trace/implementations/spine_fueled_neu.rs @@ -217,6 +217,8 @@ where } fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } fn set_physical_compaction(&mut self, frontier: AntichainRef) { + // We should never request to rewind the frontier. + debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); self.physical_frontier = frontier.to_owned(); self.consider_merges(); }