Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 88 additions & 19 deletions src/dataflow/src/sink/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ use std::rc::Rc;
use differential_dataflow::trace::cursor::Cursor;
use differential_dataflow::trace::implementations::ord::OrdValBatch;
use differential_dataflow::trace::BatchReader;
use itertools::Itertools;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use timely::dataflow::{Scope, Stream};
use timely::progress::frontier::AntichainRef;
use timely::progress::timestamp::Timestamp as TimelyTimestamp;
use timely::progress::Antichain;
use timely::PartialOrder;

use dataflow_types::{SinkAsOf, TailSinkConnector};
use expr::GlobalId;
Expand All @@ -31,13 +36,19 @@ pub fn tail<G>(
{
let mut errored = false;
let mut packer = Row::default();
let mut received_data = false;

// Initialize to the minimal input frontier.
let mut input_frontier = Antichain::from_elem(<G::Timestamp as TimelyTimestamp>::minimum());

stream.sink(Pipeline, &format!("tail-{}", id), move |input| {
input.for_each(|_, batches| {
if errored {
// TODO(benesch): we should actually drop the sink if the
// receiver has gone away.
return;
}
received_data = true;
let mut results = vec![];
for batch in batches.iter() {
let mut cursor = batch.cursor();
Expand Down Expand Up @@ -71,19 +82,17 @@ pub fn tail<G>(
results.sort_by_key(|(time, _)| *time);
let mut results: Vec<Row> = results.into_iter().map(|(_, row)| row).collect();

if connector.emit_progress {
if let Some(upper) = batch_upper(batches.last()) {
// The user has requested progress messages and there's at least one
// batch. All of the batches might have zero rows, so we do not depend on
// results at all. Another benefit of using upper (instead of the largest row
// time) is that the batch's upper may be larger than the row time.
packer.push(Datum::Decimal(Significand::new(i128::from(upper))));
packer.push(Datum::True);
// Fill in the diff column and all table columns with NULL.
for _ in 0..(connector.object_columns + 1) {
packer.push(Datum::Null);
if let Some(batch) = batches.last() {
let progress_row = update_progress(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is called unconditionally below, why do we need to create a progress event here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that there will be a progress statement in the same "batch" of data sent back to the TAIL client. With this (which is also the previous behavior that some tests ensure) you get:

START BATCH
1625135651092	f	1	1
1625135651092	f	1	2
1625135651093	t	\N	\N
END BATCH

Without this, you would get:

START BATCH
1625135651092	f	1	1
1625135651092	f	1	2
END BATCH
START BATCH
1625135651093	t	\N	\N
END BATCH

The reason (for the behaviour, not for why the client and tests want it like this) is that the input frontier is not yet "caught up" to the upper of the batch, when processing a batch. Only on the next invocation of the sink would the input frontier be at the upper of the batch, which is why we get that progress update as a separate batch when we don't emit it based on the batch upper right away.

This affects what a FETCH ALL c would return on the client, which is also affected by a TIMEOUT, if any.

@petrosagg petrosagg Jul 1, 2021

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just feels wrong to declare the upper of the batch as a new frontier if timely hasn't explicitly told us that the input hasn't advanced. Also, I don't think there are any guarantees as to how the records are broken up when you FETCH a cursor. So long as we send data with the same semantics as before we should be good. Maybe @frankmcsherry can comment more on how to handle this

EDΙT: FETCH ALL c would block forever on a cursor with TAIL, no?

@aljoscha aljoscha Jul 1, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very happy to drop that extra code! 👍 It's just that this is the current behaviour that tests expect.

EDΙT: FETCH ALL c would block forever on a cursor with TAIL, no?

I think the ALL just means "all available", because the tests use this.

See here for reference: https://github.com/MaterializeInc/materialize/blob/main/src/materialized/tests/sql.rs#L247

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is guaranteeing that if we see a batch with some upper then we won't see any timestamp less than that in the future (I'm not sure what guarantees this, but we document it in the tail docs), so this should be safe. FETCH for TAIL is special and will return up to the requested number of rows as long as some are available. If no rows are available it'll return back to the user.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the arrangement implementation that guarantees it. Change batches are only handed out once the frontier passes their upper. The arrangement operator will start emitting those batches but the frontier itself will not propagate to the downstream operator yet.

In a way, the upper of the batch gives us a quicker update than waiting for the frontier update.

&mut input_frontier,
batch.desc.upper().borrow(),
&mut packer,
connector.object_columns + 1,
);
if connector.emit_progress {
if let Some(progress_row) = progress_row {
results.push(progress_row);
}
results.push(packer.finish_and_reuse());
}
}

Expand All @@ -94,14 +103,74 @@ pub fn tail<G>(
return;
}
});

let progress_row = update_progress(
&mut input_frontier,
input.frontier().frontier(),
&mut packer,
connector.object_columns + 1,
);

// Only emit updates if this operator/worker received actual
// data for emission. For TAIL, data is exchanged to one worker,
// which forwards all the data to the client process. If we
// blindly forwarded the frontier from all workers we would get
// multiple progress updates in the client.
if connector.emit_progress && received_data {
if let Some(progress_row) = progress_row {
let results = vec![progress_row];

// TODO(benesch): the lack of backpressure here can result in
// unbounded memory usage.
if connector.tx.send(results).is_err() {
errored = true;
return;
}
}
}
})
}

fn batch_upper(
batch: Option<&Rc<OrdValBatch<GlobalId, Row, u64, isize, usize>>>,
) -> Option<Timestamp> {
batch
.map(|b| b.desc.upper().elements().get(0))
.flatten()
.copied()
// Checks if there is progress between `current_input_frontier` and
// `new_input_frontier`. If yes, updates the `current_input_frontier` to
// `new_input_frontier` and returns a [`Row`] that encodes this progress. If
// there is no progress, returns [`None`].
fn update_progress(
current_input_frontier: &mut Antichain<Timestamp>,
new_input_frontier: AntichainRef<Timestamp>,
packer: &mut Row,
empty_columns: usize,
) -> Option<Row> {
// Test to see if strict progress has occurred. This is true if the new
// frontier is not less or equal to the old frontier.
let progress = !PartialOrder::less_equal(&new_input_frontier, &current_input_frontier.borrow());

if progress {
current_input_frontier.clear();
current_input_frontier.extend(new_input_frontier.iter().cloned());
Comment thread
madelynnblue marked this conversation as resolved.

// This only looks at the first entry of the antichain.
// If we ever have multi-dimensional time, this is not correct
// anymore. There might not even be progress in the first dimension.
// We panic, so that future developers introducing multi-dimensional
// time in Materialize will notice.
let upper = new_input_frontier
.iter()
.at_most_one()
.expect("more than one element in the frontier")
.cloned();

// the input frontier might be empty
upper.map(|upper| {
packer.push(Datum::Decimal(Significand::new(i128::from(upper))));
packer.push(Datum::True);
// Fill in the diff column and all table columns with NULL.
for _ in 0..empty_columns {
packer.push(Datum::Null);
}
packer.finish_and_reuse()
})
} else {
None
}
}
155 changes: 133 additions & 22 deletions src/materialized/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::time::{Duration, Instant};

use chrono::{DateTime, Utc};
use log::info;
use postgres::Row;
use tempfile::NamedTempFile;

use util::{MzTimestamp, PostgresErrorExt, KAFKA_ADDRS};
Expand Down Expand Up @@ -261,41 +262,151 @@ fn test_tail_progress() -> Result<(), Box<dyn Error>> {
for i in 1..=3 {
let data = format!("line {}", i);
client_writes.execute("INSERT INTO t1 VALUES ($1)", &[&data])?;
match client_reads.query("FETCH ALL c1", &[])?.as_slice() {
[data_row, progress_row] => {
assert_eq!(data_row.get::<_, bool>("mz_progressed"), false);
assert_eq!(data_row.get::<_, i64>("mz_diff"), 1);
assert_eq!(data_row.get::<_, String>("data"), data);

// We have to try several times. It might be that the FETCH gets
// a batch that only contains continuous progress statements, without
// any data. We retry until we get the batch that has the data, and
// then verify that it also has a progress statement.
loop {
let rows = client_reads.query("FETCH ALL c1", &[])?;

let rows = rows.iter();

// find the data row in the sea of progress rows

// remove progress statements that occured before our data
let mut rows = rows.skip_while(|row| row.try_get::<_, String>("data").is_err());

// this must be the data row
let data_row = rows.next();

let data_row = match data_row {
Some(data_row) => data_row,
None => continue, //retry
};

assert_eq!(data_row.get::<_, bool>("mz_progressed"), false);
assert_eq!(data_row.get::<_, i64>("mz_diff"), 1);
assert_eq!(data_row.get::<_, String>("data"), data);

let mut num_progress_rows = 0;
for progress_row in rows {
assert_eq!(progress_row.get::<_, bool>("mz_progressed"), true);
assert_eq!(progress_row.get::<_, Option<i64>>("mz_diff"), None);
assert_eq!(progress_row.get::<_, Option<String>>("data"), None);

let data_ts: MzTimestamp = data_row.get("mz_timestamp");
let progress_ts: MzTimestamp = progress_row.get("mz_timestamp");
assert!(data_ts < progress_ts);

num_progress_rows += 1;
}
_ => panic!("wrong number of rows returned"),

assert!(num_progress_rows > 0);

// success! break out of the loop
break;
}
}

// Test that tailing non-nullable columns with progress information
// turn sthem into nullable columns. See #6304.
{
client_writes.batch_execute("CREATE TABLE t2 (data text NOT NULL)")?;
client_writes.batch_execute("INSERT INTO t2 VALUES ('data')")?;
client_reads.batch_execute(
"COMMIT; BEGIN;
Ok(())
}

// Verifies that tailing non-nullable columns with progress information
// turns them into nullable columns. See #6304.
#[test]
fn test_tail_progress_non_nullable_columns() -> Result<(), Box<dyn Error>> {
ore::test::init_logging();

let config = util::Config::default().workers(2);
let server = util::start_server(config)?;
let mut client_writes = server.connect(postgres::NoTls)?;
let mut client_reads = server.connect(postgres::NoTls)?;

client_writes.batch_execute("CREATE TABLE t2 (data text NOT NULL)")?;
client_writes.batch_execute("INSERT INTO t2 VALUES ('data')")?;
client_reads.batch_execute(
"COMMIT; BEGIN;
DECLARE c2 CURSOR FOR TAIL t2 WITH (PROGRESS);",
)?;
let data_row = client_reads.query_one("FETCH 1 c2", &[])?;
assert_eq!(data_row.get::<_, bool>("mz_progressed"), false);
assert_eq!(data_row.get::<_, i64>("mz_diff"), 1);
assert_eq!(data_row.get::<_, String>("data"), "data");
let progress_row = client_reads.query_one("FETCH 1 c2", &[])?;
assert_eq!(progress_row.get::<_, bool>("mz_progressed"), true);
assert_eq!(progress_row.get::<_, Option<i64>>("mz_diff"), None);
assert_eq!(progress_row.get::<_, Option<String>>("data"), None);
)?;
let data_row = client_reads.query_one("FETCH 1 c2", &[])?;
assert_eq!(data_row.get::<_, bool>("mz_progressed"), false);
assert_eq!(data_row.get::<_, i64>("mz_diff"), 1);
assert_eq!(data_row.get::<_, String>("data"), "data");
let progress_row = client_reads.query_one("FETCH 1 c2", &[])?;
assert_eq!(progress_row.get::<_, bool>("mz_progressed"), true);
assert_eq!(progress_row.get::<_, Option<i64>>("mz_diff"), None);
assert_eq!(progress_row.get::<_, Option<String>>("data"), None);

Ok(())
}

/// Verifies that we get continuous progress messages as soon as at least one
/// data message was produced.
#[test]
fn test_tail_continuous_progress() -> Result<(), Box<dyn Error>> {
ore::test::init_logging();

let config = util::Config::default().workers(2);
let server = util::start_server(config)?;
let mut client_writes = server.connect(postgres::NoTls)?;
let mut client_reads = server.connect(postgres::NoTls)?;

client_writes.batch_execute("CREATE TABLE t1 (data text)")?;
client_reads.batch_execute(
"COMMIT; BEGIN;
DECLARE c1 CURSOR FOR TAIL t1 WITH (PROGRESS);",
)?;

client_writes.execute("INSERT INTO t1 VALUES ($1)", &[&"hello".to_owned()])?;

let mut last_ts = MzTimestamp(u64::MIN);
let mut verify_rows = move |rows: Vec<Row>| -> (usize, usize) {
let mut num_data_rows = 0;
let mut num_progress_rows = 0;

for row in rows {
let diff = row.get::<_, Option<i64>>("mz_diff");
match diff {
Some(diff) => {
num_data_rows += 1;

assert_eq!(diff, 1);
assert_eq!(row.get::<_, bool>("mz_progressed"), false);
let data = row.get::<_, Option<String>>("data");
assert!(data.is_some());
}
None => {
num_progress_rows += 1;

assert_eq!(row.get::<_, bool>("mz_progressed"), true);
assert_eq!(row.get::<_, Option<String>>("data"), None);
}
}

let ts: MzTimestamp = row.get("mz_timestamp");
assert!(last_ts < ts);
last_ts = ts;
}

(num_data_rows, num_progress_rows)
};

// this will fetch away the initial batch that contains our data message
// plus some progress messages
let rows = client_reads.query("FETCH ALL c1", &[])?;
let (num_data_rows, num_progress_rows) = verify_rows(rows);
assert_eq!(num_data_rows, 1);
assert!(num_progress_rows > 0);

// Try and read some progress messages. The normal update interval is
// 1s, so only wait for two updates. Otherwise this would run for too long.
for _i in 1..=2 {
let rows = client_reads.query("FETCH ALL c1", &[])?;

let (num_data_rows, num_progress_rows) = verify_rows(rows);
assert_eq!(num_data_rows, 0);
assert!(num_progress_rows > 0);
}

Ok(())
Expand Down
34 changes: 34 additions & 0 deletions test/testdrive/fetch-tail-continuous-progress.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

#
# Make sure that TAIL WITH (progress=true) emits periodic progress
# messages even if there's no new data.
#

$ set-regex match=\d{13} replacement=<TIMESTAMP>

> CREATE TABLE t1 (f1 INTEGER);

> INSERT INTO t1 VALUES (123);

> BEGIN

> DECLARE c CURSOR FOR TAIL t1 WITH (progress=true);

# Verify there is a progress row available in the first batch.
> FETCH 2 c
<TIMESTAMP> false 1 123
<TIMESTAMP> true <null> <null>

# Now ask (and possibly wait) for another progress row.
> FETCH 1 c
<TIMESTAMP> true <null> <null>

> COMMIT