From eb4f8a6ef6baaf01ca6ac4520c29dfb100df5c3b Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Wed, 15 Dec 2021 11:25:36 -0800 Subject: [PATCH] persist: periodically yield in Replay operator Instead of emitting it all the first time the operator is scheduled. This allows downstream operators a chance to process the data it has emitted, possibly reducing it down and evening out memory usage. Until the persist Replay operator has finished, it can't downgrade its output capability. However, Frank has said "I would say that operators are mostly either 1. stateless and benefit from streaming through, or 2. arrange." Even arrange has some opportunity for data reduction. An arrange downstream of replay would just be buffering the replayed data until the frontier advances, but it does consolidate within this buffer. Persist keeps things mostly consolidated, but unsealed data is unconsolidated and even trace data is not across batches (which could be relevant depending on the as_of). --- src/persist/src/nemesis/direct.rs | 10 ++- src/persist/src/operators/mod.rs | 5 ++ src/persist/src/operators/replay.rs | 133 ++++++++++++++++++++++++---- src/persist/src/operators/source.rs | 28 +++++- 4 files changed, 153 insertions(+), 23 deletions(-) diff --git a/src/persist/src/nemesis/direct.rs b/src/persist/src/nemesis/direct.rs index e053bf5776320..1c23a9610e7b3 100644 --- a/src/persist/src/nemesis/direct.rs +++ b/src/persist/src/nemesis/direct.rs @@ -147,6 +147,9 @@ struct DirectCore { } impl DirectCore { + // Selected to be a small prime number. + const OUTPUTS_PER_YIELD: usize = 3; + fn stream(&mut self, name: &str) -> Result { match self.streams.entry(name.to_owned()) { Entry::Occupied(x) => { @@ -173,8 +176,11 @@ impl DirectCore { .lock() .expect("clone doesn't panic and poison lock") .clone(); - let data = - scope.persisted_source(dataflow_read, &Antichain::from_elem(0)); + let data = scope.persisted_source_yield( + dataflow_read, + &Antichain::from_elem(0), + Self::OUTPUTS_PER_YIELD, + ); data.probe_with(&mut probe).capture_into(output_tx); }); while worker.step_or_park(None) { diff --git a/src/persist/src/operators/mod.rs b/src/persist/src/operators/mod.rs index 4fd3f427059d4..7ca8e6a0ab470 100644 --- a/src/persist/src/operators/mod.rs +++ b/src/persist/src/operators/mod.rs @@ -16,6 +16,11 @@ pub mod source; pub mod stream; pub mod upsert; +// This was selected to match the heuristic in differential dataflow's join. +// +// https://docs.rs/differential-dataflow/0.12.0/src/differential_dataflow/operators/join.rs.html#487-488 +const DEFAULT_OUTPUTS_PER_YIELD: usize = 1_000_000; + pub(crate) fn split_ok_err( x: (Result<(K, V), String>, u64, isize), ) -> Result<((K, V), u64, isize), (String, u64, isize)> { diff --git a/src/persist/src/operators/replay.rs b/src/persist/src/operators/replay.rs index 45f042a77d509..7571832036992 100644 --- a/src/persist/src/operators/replay.rs +++ b/src/persist/src/operators/replay.rs @@ -19,6 +19,7 @@ use timely::{Data as TimelyData, PartialOrder}; use crate::client::DecodedSnapshot; use crate::error::Error; use crate::indexed::Snapshot; +use crate::operators::DEFAULT_OUTPUTS_PER_YIELD; /// Extension trait for [`Stream`]. pub trait Replay, K: TimelyData, V: TimelyData> { @@ -27,6 +28,19 @@ pub trait Replay, K: TimelyData, V: TimelyData> { &self, snapshot: Result, Error>, as_of_frontier: &Antichain, + ) -> Stream, u64, isize)> { + self.replay_yield(snapshot, as_of_frontier, DEFAULT_OUTPUTS_PER_YIELD) + } + + /// Emits each record in a snapshot, yielding periodically. + /// + /// This yields after `outputs_per_yield` outputs to allow downstream + /// operators to reduce down the data and limit max memory usage. + fn replay_yield( + &self, + snapshot: Result, Error>, + as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)>; } @@ -36,10 +50,11 @@ where K: TimelyData + Codec, V: TimelyData + Codec, { - fn replay( + fn replay_yield( &self, snapshot: Result, Error>, as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)> { // TODO: This currently works by only emitting the persisted // data on worker 0 because that was the simplest thing to do @@ -52,37 +67,36 @@ where let result_stream: Stream> = operator::source( self, "Replay", - move |cap, _info| { + move |cap, info| { + let activator = self.activator_for(&info.address[..]); let mut snapshot_cap = if active_worker { - Some((snapshot, cap)) + Some((snapshot.map(|s| (s.since(), s.into_iter())), cap)) } else { None }; move |output| { - let (snapshot, cap) = match snapshot_cap.take() { + let mut done = true; + let (snapshot, cap) = match snapshot_cap.as_mut() { Some(x) => x, - None => return, // We were already invoked and consumed our snapshot. + None => return, // We already consumed our snapshot. }; let mut session = output.session(&cap); match snapshot { - Ok(snapshot) => { - let snapshot_since = snapshot.since(); - - if PartialOrder::less_than(&as_of_frontier, &snapshot_since) { - session.give(Err(Error::String(format!( + Ok((snapshot_since, _)) + if PartialOrder::less_than(&as_of_frontier, &snapshot_since) => + { + session.give(Err(Error::String(format!( "replaying persisted data: snapshot since ({:?}) is beyond expected as_of ({:?})", snapshot_since, as_of_frontier )))); - - return; - } - - // TODO: Periodically yield to let the rest of the dataflow - // reduce this down. - for x in snapshot.into_iter() { + } + Ok((snapshot_since, snapshot_iter)) => { + // NB: This `idx` from enumerate resets back to 0 + // each time the operator is run. + for (idx, x) in snapshot_iter.enumerate() { if let Ok((_, ts, _)) = &x { // The raw update data held internally in the // snapshot may not be physically compacted up to @@ -92,6 +106,10 @@ where debug_assert!(snapshot_since.less_equal(ts)); } session.give(x); + if idx + 1 >= outputs_per_yield { + done = false; + break; + } } } Err(e) => { @@ -101,6 +119,12 @@ where )))); } } + + if done { + snapshot_cap.take(); + } else { + activator.activate(); + } } }, ); @@ -138,8 +162,9 @@ where #[cfg(test)] mod tests { + use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::capture::Extract; - use timely::dataflow::operators::{Capture, OkErr}; + use timely::dataflow::operators::{Capture, OkErr, Operator}; use timely::progress::Antichain; use crate::error::Error; @@ -299,4 +324,76 @@ mod tests { Ok(()) } + + #[test] + fn replay_batching() -> Result<(), Error> { + const OUTPUTS_PER_YIELD: usize = 3; + const NUM_WRITES: u64 = 100u64; + let mut registry = MemRegistry::new(); + let p = registry.runtime_no_reentrance()?; + + let (write, _) = p.create_or_load("1"); + for i in 1..=NUM_WRITES { + write + .write(&[((format!("{:03}", i), ()), i, 1)]) + .recv() + .expect("write was successful"); + } + write + .seal(NUM_WRITES + 1) + .recv() + .expect("seal was successful"); + + let (oks, errs) = timely::execute_directly(move |worker| { + let (oks, errs) = worker.dataflow(|scope| { + let (_, read) = p.create_or_load::("1"); + let (ok_stream, err_stream) = scope + .replay_yield(read.snapshot(), &Antichain::from_elem(0), OUTPUTS_PER_YIELD) + .ok_err(split_ok_err); + // Preserve the batching structure out of replay so we can + // assert on it. + let ok_stream = ok_stream.unary(Pipeline, "Batches", move |_, _| { + move |input, output| { + input.for_each(|time, data| { + let mut batch = Vec::new(); + data.swap(&mut batch); + output.session(&time).give(batch); + }); + } + }); + (ok_stream.capture(), err_stream.capture()) + }); + + (oks, errs) + }); + + assert_eq!( + errs.extract() + .into_iter() + .flat_map(|(_time, data)| data.into_iter().map(|(err, _ts, _diff)| err)) + .collect::>(), + Vec::::new() + ); + + // Verify that the batches are all <= the yield size. + let mut actual = oks + .extract() + .into_iter() + .flat_map(|(_, batches)| { + batches.into_iter().flat_map(|batch| { + assert!(batch.len() <= OUTPUTS_PER_YIELD); + batch + }) + }) + .collect::>(); + actual.sort_by_key(|((k, _), _, _)| k.clone()); + + let expected = (1..=NUM_WRITES) + .map(|x| ((format!("{:03}", x), ()), x, 1)) + .collect::>(); + // Verify that the flattened contents are what we expect. + assert_eq!(actual, expected); + + Ok(()) + } } diff --git a/src/persist/src/operators/source.rs b/src/persist/src/operators/source.rs index 6bdb95697b8c1..37c4d7c8ca5d3 100644 --- a/src/persist/src/operators/source.rs +++ b/src/persist/src/operators/source.rs @@ -21,6 +21,7 @@ use timely::Data as TimelyData; use crate::client::StreamReadHandle; use crate::indexed::ListenEvent; use crate::operators::replay::Replay; +use crate::operators::DEFAULT_OUTPUTS_PER_YIELD; /// A Timely Dataflow operator that mirrors a persisted stream. pub trait PersistedSource, K: TimelyData, V: TimelyData> { @@ -30,6 +31,21 @@ pub trait PersistedSource, K: TimelyData, V: TimelyDat &mut self, read: StreamReadHandle, as_of_frontier: &Antichain, + ) -> Stream, u64, isize)> { + self.persisted_source_yield(read, as_of_frontier, DEFAULT_OUTPUTS_PER_YIELD) + } + + /// Emits a snapshot of the persisted stream taken as of this call and + /// listens for any new data added to the persisted stream after that, + /// yielding periodically. + /// + /// This yields after `outputs_per_yield` outputs to allow downstream + /// operators to reduce down the data and limit max memory usage. + fn persisted_source_yield( + &mut self, + read: StreamReadHandle, + as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)>; } @@ -39,10 +55,11 @@ where K: TimelyData + Codec + Send, V: TimelyData + Codec + Send, { - fn persisted_source( + fn persisted_source_yield( &mut self, read: StreamReadHandle, as_of_frontier: &Antichain, + outputs_per_yield: usize, ) -> Stream, u64, isize)> { let (listen_tx, listen_rx) = crossbeam_channel::unbounded(); let snapshot = read.listen(listen_tx); @@ -55,10 +72,10 @@ where // it for the operator. // listen to new data - let new = listen_source(self, snapshot_seal, listen_rx); + let new = listen_source(self, snapshot_seal, listen_rx, outputs_per_yield); // Replay the previously persisted data, if any. - let previous = self.replay(snapshot, as_of_frontier); + let previous = self.replay_yield(snapshot, as_of_frontier, outputs_per_yield); previous.concat(&new) } @@ -72,6 +89,7 @@ fn listen_source( // meaning that there will be no more events in the future. initial_frontier: Option>, listen_rx: crossbeam_channel::Receiver, + _outputs_per_yield: usize, ) -> Stream, u64, isize)> where G: Scope, @@ -121,6 +139,10 @@ where session.give(result); } activator.activate(); + // TODO: Instead of yielding after every message + // from the channel, only yield after we've given at + // least `outputs_per_yield` records (or if we get + // an Empty or Disconnected from try_recv). } ListenEvent::Sealed(ts) => { cap.downgrade(&ts);