diff --git a/src/persist/src/nemesis/direct.rs b/src/persist/src/nemesis/direct.rs index e053bf5776320..1c23a9610e7b3 100644 --- a/src/persist/src/nemesis/direct.rs +++ b/src/persist/src/nemesis/direct.rs @@ -147,6 +147,9 @@ struct DirectCore { } impl DirectCore { + // Selected to be a small prime number. + const OUTPUTS_PER_YIELD: usize = 3; + fn stream(&mut self, name: &str) -> Result { match self.streams.entry(name.to_owned()) { Entry::Occupied(x) => { @@ -173,8 +176,11 @@ impl DirectCore { .lock() .expect("clone doesn't panic and poison lock") .clone(); - let data = - scope.persisted_source(dataflow_read, &Antichain::from_elem(0)); + let data = scope.persisted_source_yield( + dataflow_read, + &Antichain::from_elem(0), + Self::OUTPUTS_PER_YIELD, + ); data.probe_with(&mut probe).capture_into(output_tx); }); while worker.step_or_park(None) { diff --git a/src/persist/src/operators/mod.rs b/src/persist/src/operators/mod.rs index 4fd3f427059d4..7ca8e6a0ab470 100644 --- a/src/persist/src/operators/mod.rs +++ b/src/persist/src/operators/mod.rs @@ -16,6 +16,11 @@ pub mod source; pub mod stream; pub mod upsert; +// This was selected to match the heuristic in differential dataflow's join. +// +// https://docs.rs/differential-dataflow/0.12.0/src/differential_dataflow/operators/join.rs.html#487-488 +const DEFAULT_OUTPUTS_PER_YIELD: usize = 1_000_000; + pub(crate) fn split_ok_err( x: (Result<(K, V), String>, u64, isize), ) -> Result<((K, V), u64, isize), (String, u64, isize)> { diff --git a/src/persist/src/operators/replay.rs b/src/persist/src/operators/replay.rs index 45f042a77d509..7571832036992 100644 --- a/src/persist/src/operators/replay.rs +++ b/src/persist/src/operators/replay.rs @@ -19,6 +19,7 @@ use timely::{Data as TimelyData, PartialOrder}; use crate::client::DecodedSnapshot; use crate::error::Error; use crate::indexed::Snapshot; +use crate::operators::DEFAULT_OUTPUTS_PER_YIELD; /// Extension trait for [`Stream`]. pub trait Replay, K: TimelyData, V: TimelyData> { @@ -27,6 +28,19 @@ pub trait Replay, K: TimelyData, V: TimelyData> { &self, snapshot: Result, Error>, as_of_frontier: &Antichain, + ) -> Stream, u64, isize)> { + self.replay_yield(snapshot, as_of_frontier, DEFAULT_OUTPUTS_PER_YIELD) + } + + /// Emits each record in a snapshot, yielding periodically. + /// + /// This yields after `outputs_per_yield` outputs to allow downstream + /// operators to reduce down the data and limit max memory usage. + fn replay_yield( + &self, + snapshot: Result, Error>, + as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)>; } @@ -36,10 +50,11 @@ where K: TimelyData + Codec, V: TimelyData + Codec, { - fn replay( + fn replay_yield( &self, snapshot: Result, Error>, as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)> { // TODO: This currently works by only emitting the persisted // data on worker 0 because that was the simplest thing to do @@ -52,37 +67,36 @@ where let result_stream: Stream> = operator::source( self, "Replay", - move |cap, _info| { + move |cap, info| { + let activator = self.activator_for(&info.address[..]); let mut snapshot_cap = if active_worker { - Some((snapshot, cap)) + Some((snapshot.map(|s| (s.since(), s.into_iter())), cap)) } else { None }; move |output| { - let (snapshot, cap) = match snapshot_cap.take() { + let mut done = true; + let (snapshot, cap) = match snapshot_cap.as_mut() { Some(x) => x, - None => return, // We were already invoked and consumed our snapshot. + None => return, // We already consumed our snapshot. }; let mut session = output.session(&cap); match snapshot { - Ok(snapshot) => { - let snapshot_since = snapshot.since(); - - if PartialOrder::less_than(&as_of_frontier, &snapshot_since) { - session.give(Err(Error::String(format!( + Ok((snapshot_since, _)) + if PartialOrder::less_than(&as_of_frontier, &snapshot_since) => + { + session.give(Err(Error::String(format!( "replaying persisted data: snapshot since ({:?}) is beyond expected as_of ({:?})", snapshot_since, as_of_frontier )))); - - return; - } - - // TODO: Periodically yield to let the rest of the dataflow - // reduce this down. - for x in snapshot.into_iter() { + } + Ok((snapshot_since, snapshot_iter)) => { + // NB: This `idx` from enumerate resets back to 0 + // each time the operator is run. + for (idx, x) in snapshot_iter.enumerate() { if let Ok((_, ts, _)) = &x { // The raw update data held internally in the // snapshot may not be physically compacted up to @@ -92,6 +106,10 @@ where debug_assert!(snapshot_since.less_equal(ts)); } session.give(x); + if idx + 1 >= outputs_per_yield { + done = false; + break; + } } } Err(e) => { @@ -101,6 +119,12 @@ where )))); } } + + if done { + snapshot_cap.take(); + } else { + activator.activate(); + } } }, ); @@ -138,8 +162,9 @@ where #[cfg(test)] mod tests { + use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::capture::Extract; - use timely::dataflow::operators::{Capture, OkErr}; + use timely::dataflow::operators::{Capture, OkErr, Operator}; use timely::progress::Antichain; use crate::error::Error; @@ -299,4 +324,76 @@ mod tests { Ok(()) } + + #[test] + fn replay_batching() -> Result<(), Error> { + const OUTPUTS_PER_YIELD: usize = 3; + const NUM_WRITES: u64 = 100u64; + let mut registry = MemRegistry::new(); + let p = registry.runtime_no_reentrance()?; + + let (write, _) = p.create_or_load("1"); + for i in 1..=NUM_WRITES { + write + .write(&[((format!("{:03}", i), ()), i, 1)]) + .recv() + .expect("write was successful"); + } + write + .seal(NUM_WRITES + 1) + .recv() + .expect("seal was successful"); + + let (oks, errs) = timely::execute_directly(move |worker| { + let (oks, errs) = worker.dataflow(|scope| { + let (_, read) = p.create_or_load::("1"); + let (ok_stream, err_stream) = scope + .replay_yield(read.snapshot(), &Antichain::from_elem(0), OUTPUTS_PER_YIELD) + .ok_err(split_ok_err); + // Preserve the batching structure out of replay so we can + // assert on it. + let ok_stream = ok_stream.unary(Pipeline, "Batches", move |_, _| { + move |input, output| { + input.for_each(|time, data| { + let mut batch = Vec::new(); + data.swap(&mut batch); + output.session(&time).give(batch); + }); + } + }); + (ok_stream.capture(), err_stream.capture()) + }); + + (oks, errs) + }); + + assert_eq!( + errs.extract() + .into_iter() + .flat_map(|(_time, data)| data.into_iter().map(|(err, _ts, _diff)| err)) + .collect::>(), + Vec::::new() + ); + + // Verify that the batches are all <= the yield size. + let mut actual = oks + .extract() + .into_iter() + .flat_map(|(_, batches)| { + batches.into_iter().flat_map(|batch| { + assert!(batch.len() <= OUTPUTS_PER_YIELD); + batch + }) + }) + .collect::>(); + actual.sort_by_key(|((k, _), _, _)| k.clone()); + + let expected = (1..=NUM_WRITES) + .map(|x| ((format!("{:03}", x), ()), x, 1)) + .collect::>(); + // Verify that the flattened contents are what we expect. + assert_eq!(actual, expected); + + Ok(()) + } } diff --git a/src/persist/src/operators/source.rs b/src/persist/src/operators/source.rs index 6bdb95697b8c1..37c4d7c8ca5d3 100644 --- a/src/persist/src/operators/source.rs +++ b/src/persist/src/operators/source.rs @@ -21,6 +21,7 @@ use timely::Data as TimelyData; use crate::client::StreamReadHandle; use crate::indexed::ListenEvent; use crate::operators::replay::Replay; +use crate::operators::DEFAULT_OUTPUTS_PER_YIELD; /// A Timely Dataflow operator that mirrors a persisted stream. pub trait PersistedSource, K: TimelyData, V: TimelyData> { @@ -30,6 +31,21 @@ pub trait PersistedSource, K: TimelyData, V: TimelyDat &mut self, read: StreamReadHandle, as_of_frontier: &Antichain, + ) -> Stream, u64, isize)> { + self.persisted_source_yield(read, as_of_frontier, DEFAULT_OUTPUTS_PER_YIELD) + } + + /// Emits a snapshot of the persisted stream taken as of this call and + /// listens for any new data added to the persisted stream after that, + /// yielding periodically. + /// + /// This yields after `outputs_per_yield` outputs to allow downstream + /// operators to reduce down the data and limit max memory usage. + fn persisted_source_yield( + &mut self, + read: StreamReadHandle, + as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)>; } @@ -39,10 +55,11 @@ where K: TimelyData + Codec + Send, V: TimelyData + Codec + Send, { - fn persisted_source( + fn persisted_source_yield( &mut self, read: StreamReadHandle, as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)> { let (listen_tx, listen_rx) = crossbeam_channel::unbounded(); let snapshot = read.listen(listen_tx); @@ -55,10 +72,10 @@ where // it for the operator. // listen to new data - let new = listen_source(self, snapshot_seal, listen_rx); + let new = listen_source(self, snapshot_seal, listen_rx, outputs_per_yield); // Replay the previously persisted data, if any. - let previous = self.replay(snapshot, as_of_frontier); + let previous = self.replay_yield(snapshot, as_of_frontier, outputs_per_yield); previous.concat(&new) } @@ -72,6 +89,7 @@ fn listen_source( // meaning that there will be no more events in the future. initial_frontier: Option>, listen_rx: crossbeam_channel::Receiver, + _outputs_per_yield: usize, ) -> Stream, u64, isize)> where G: Scope, @@ -121,6 +139,10 @@ where session.give(result); } activator.activate(); + // TODO: Instead of yielding after every message + // from the channel, only yield after we've given at + // least `outputs_per_yield` records (or if we get + // an Empty or Disconnected from try_recv). } ListenEvent::Sealed(ts) => { cap.downgrade(&ts);