Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/persist/src/nemesis/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ struct DirectCore {
}

impl DirectCore {
// Selected to be a small prime number.
const OUTPUTS_PER_YIELD: usize = 3;

fn stream(&mut self, name: &str) -> Result<Ingest, Error> {
match self.streams.entry(name.to_owned()) {
Entry::Occupied(x) => {
Expand All @@ -173,8 +176,11 @@ impl DirectCore {
.lock()
.expect("clone doesn't panic and poison lock")
.clone();
let data =
scope.persisted_source(dataflow_read, &Antichain::from_elem(0));
let data = scope.persisted_source_yield(
dataflow_read,
&Antichain::from_elem(0),
Self::OUTPUTS_PER_YIELD,
);
data.probe_with(&mut probe).capture_into(output_tx);
});
while worker.step_or_park(None) {
Expand Down
5 changes: 5 additions & 0 deletions src/persist/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ pub mod source;
pub mod stream;
pub mod upsert;

// This was selected to match the heuristic in differential dataflow's join.
//
// https://docs.rs/differential-dataflow/0.12.0/src/differential_dataflow/operators/join.rs.html#487-488
const DEFAULT_OUTPUTS_PER_YIELD: usize = 1_000_000;

pub(crate) fn split_ok_err<K, V>(
x: (Result<(K, V), String>, u64, isize),
) -> Result<((K, V), u64, isize), (String, u64, isize)> {
Expand Down
133 changes: 115 additions & 18 deletions src/persist/src/operators/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use timely::{Data as TimelyData, PartialOrder};
use crate::client::DecodedSnapshot;
use crate::error::Error;
use crate::indexed::Snapshot;
use crate::operators::DEFAULT_OUTPUTS_PER_YIELD;

/// Extension trait for [`Stream`].
pub trait Replay<G: Scope<Timestamp = u64>, K: TimelyData, V: TimelyData> {
Expand All @@ -27,6 +28,19 @@ pub trait Replay<G: Scope<Timestamp = u64>, K: TimelyData, V: TimelyData> {
&self,
snapshot: Result<DecodedSnapshot<K, V>, Error>,
as_of_frontier: &Antichain<u64>,
) -> Stream<G, (Result<(K, V), String>, u64, isize)> {
self.replay_yield(snapshot, as_of_frontier, DEFAULT_OUTPUTS_PER_YIELD)
}

/// Emits each record in a snapshot, yielding periodically.
///
/// This yields after `outputs_per_yield` outputs to allow downstream
/// operators to reduce down the data and limit max memory usage.
fn replay_yield(
&self,
snapshot: Result<DecodedSnapshot<K, V>, Error>,
as_of_frontier: &Antichain<u64>,
outputs_per_yield: usize,
) -> Stream<G, (Result<(K, V), String>, u64, isize)>;
}

Expand All @@ -36,10 +50,11 @@ where
K: TimelyData + Codec,
V: TimelyData + Codec,
{
fn replay(
fn replay_yield(
&self,
snapshot: Result<DecodedSnapshot<K, V>, Error>,
as_of_frontier: &Antichain<u64>,
outputs_per_yield: usize,
) -> Stream<G, (Result<(K, V), String>, u64, isize)> {
// TODO: This currently works by only emitting the persisted
// data on worker 0 because that was the simplest thing to do
Expand All @@ -52,37 +67,36 @@ where
let result_stream: Stream<G, Result<((K, V), u64, isize), Error>> = operator::source(
self,
"Replay",
move |cap, _info| {
move |cap, info| {
let activator = self.activator_for(&info.address[..]);
let mut snapshot_cap = if active_worker {
Some((snapshot, cap))
Some((snapshot.map(|s| (s.since(), s.into_iter())), cap))
} else {
None
};

move |output| {
let (snapshot, cap) = match snapshot_cap.take() {
let mut done = true;
let (snapshot, cap) = match snapshot_cap.as_mut() {
Some(x) => x,
None => return, // We were already invoked and consumed our snapshot.
None => return, // We already consumed our snapshot.
};

let mut session = output.session(&cap);

match snapshot {
Ok(snapshot) => {
let snapshot_since = snapshot.since();

if PartialOrder::less_than(&as_of_frontier, &snapshot_since) {
session.give(Err(Error::String(format!(
Ok((snapshot_since, _))
if PartialOrder::less_than(&as_of_frontier, &snapshot_since) =>
{
session.give(Err(Error::String(format!(
"replaying persisted data: snapshot since ({:?}) is beyond expected as_of ({:?})",
snapshot_since, as_of_frontier
))));

return;
}

// TODO: Periodically yield to let the rest of the dataflow
// reduce this down.
for x in snapshot.into_iter() {
}
Ok((snapshot_since, snapshot_iter)) => {
// NB: This `idx` from enumerate resets back to 0
// each time the operator is run.
for (idx, x) in snapshot_iter.enumerate() {
if let Ok((_, ts, _)) = &x {
// The raw update data held internally in the
// snapshot may not be physically compacted up to
Expand All @@ -92,6 +106,10 @@ where
debug_assert!(snapshot_since.less_equal(ts));
}
session.give(x);
if idx + 1 >= outputs_per_yield {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was not obvious to me that enumerate will restart counting from zero every time we re-enter the operator - maybe worth a comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

done = false;
break;
}
}
}
Err(e) => {
Expand All @@ -101,6 +119,12 @@ where
))));
}
}

if done {
snapshot_cap.take();
} else {
activator.activate();
}
}
},
);
Expand Down Expand Up @@ -138,8 +162,9 @@ where
#[cfg(test)]
mod tests {

use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::operators::{Capture, OkErr};
use timely::dataflow::operators::{Capture, OkErr, Operator};
use timely::progress::Antichain;

use crate::error::Error;
Expand Down Expand Up @@ -299,4 +324,76 @@ mod tests {

Ok(())
}

#[test]
fn replay_batching() -> Result<(), Error> {
const OUTPUTS_PER_YIELD: usize = 3;
const NUM_WRITES: u64 = 100u64;
let mut registry = MemRegistry::new();
let p = registry.runtime_no_reentrance()?;

let (write, _) = p.create_or_load("1");
for i in 1..=NUM_WRITES {
write
.write(&[((format!("{:03}", i), ()), i, 1)])
.recv()
.expect("write was successful");
}
write
.seal(NUM_WRITES + 1)
.recv()
.expect("seal was successful");

let (oks, errs) = timely::execute_directly(move |worker| {
let (oks, errs) = worker.dataflow(|scope| {
let (_, read) = p.create_or_load::<String, ()>("1");
let (ok_stream, err_stream) = scope
.replay_yield(read.snapshot(), &Antichain::from_elem(0), OUTPUTS_PER_YIELD)
.ok_err(split_ok_err);
// Preserve the batching structure out of replay so we can
// assert on it.
let ok_stream = ok_stream.unary(Pipeline, "Batches", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
let mut batch = Vec::new();
data.swap(&mut batch);
output.session(&time).give(batch);
});
}
});
(ok_stream.capture(), err_stream.capture())
});

(oks, errs)
});

assert_eq!(
errs.extract()
.into_iter()
.flat_map(|(_time, data)| data.into_iter().map(|(err, _ts, _diff)| err))
.collect::<Vec<_>>(),
Vec::<String>::new()
);

// Verify that the batches are all <= the yield size.
let mut actual = oks
.extract()
.into_iter()
.flat_map(|(_, batches)| {
batches.into_iter().flat_map(|batch| {
assert!(batch.len() <= OUTPUTS_PER_YIELD);
batch
})
})
.collect::<Vec<_>>();
actual.sort_by_key(|((k, _), _, _)| k.clone());

let expected = (1..=NUM_WRITES)
.map(|x| ((format!("{:03}", x), ()), x, 1))
.collect::<Vec<_>>();
// Verify that the flattened contents are what we expect.
assert_eq!(actual, expected);

Ok(())
}
}
28 changes: 25 additions & 3 deletions src/persist/src/operators/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use timely::Data as TimelyData;
use crate::client::StreamReadHandle;
use crate::indexed::ListenEvent;
use crate::operators::replay::Replay;
use crate::operators::DEFAULT_OUTPUTS_PER_YIELD;

/// A Timely Dataflow operator that mirrors a persisted stream.
pub trait PersistedSource<G: Scope<Timestamp = u64>, K: TimelyData, V: TimelyData> {
Expand All @@ -30,6 +31,21 @@ pub trait PersistedSource<G: Scope<Timestamp = u64>, K: TimelyData, V: TimelyDat
&mut self,
read: StreamReadHandle<K, V>,
as_of_frontier: &Antichain<u64>,
) -> Stream<G, (Result<(K, V), String>, u64, isize)> {
self.persisted_source_yield(read, as_of_frontier, DEFAULT_OUTPUTS_PER_YIELD)
}

/// Emits a snapshot of the persisted stream taken as of this call and
/// listens for any new data added to the persisted stream after that,
/// yielding periodically.
///
/// This yields after `outputs_per_yield` outputs to allow downstream
/// operators to reduce down the data and limit max memory usage.
fn persisted_source_yield(
&mut self,
read: StreamReadHandle<K, V>,
as_of_frontier: &Antichain<u64>,
outputs_per_yield: usize,
) -> Stream<G, (Result<(K, V), String>, u64, isize)>;
}

Expand All @@ -39,10 +55,11 @@ where
K: TimelyData + Codec + Send,
V: TimelyData + Codec + Send,
{
fn persisted_source(
fn persisted_source_yield(
&mut self,
read: StreamReadHandle<K, V>,
as_of_frontier: &Antichain<u64>,
outputs_per_yield: usize,
) -> Stream<G, (Result<(K, V), String>, u64, isize)> {
let (listen_tx, listen_rx) = crossbeam_channel::unbounded();
let snapshot = read.listen(listen_tx);
Expand All @@ -55,10 +72,10 @@ where
// it for the operator.

// listen to new data
let new = listen_source(self, snapshot_seal, listen_rx);
let new = listen_source(self, snapshot_seal, listen_rx, outputs_per_yield);

// Replay the previously persisted data, if any.
let previous = self.replay(snapshot, as_of_frontier);
let previous = self.replay_yield(snapshot, as_of_frontier, outputs_per_yield);

previous.concat(&new)
}
Expand All @@ -72,6 +89,7 @@ fn listen_source<G, K, V>(
// meaning that there will be no more events in the future.
initial_frontier: Option<Antichain<u64>>,
listen_rx: crossbeam_channel::Receiver<ListenEvent>,
_outputs_per_yield: usize,
) -> Stream<G, (Result<(K, V), String>, u64, isize)>
where
G: Scope<Timestamp = u64>,
Expand Down Expand Up @@ -121,6 +139,10 @@ where
session.give(result);
}
activator.activate();
// TODO: Instead of yielding after every message
// from the channel, only yield after we've given at
// least `outputs_per_yield` records (or if we get
// an Empty or Disconnected from try_recv).
}
ListenEvent::Sealed(ts) => {
cap.downgrade(&ts);
Expand Down