diff --git a/src/dataflow/src/sink/tail.rs b/src/dataflow/src/sink/tail.rs index 0860205fd8543..d36597b897aaf 100644 --- a/src/dataflow/src/sink/tail.rs +++ b/src/dataflow/src/sink/tail.rs @@ -12,9 +12,14 @@ use std::rc::Rc; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::ord::OrdValBatch; use differential_dataflow::trace::BatchReader; +use itertools::Itertools; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; use timely::dataflow::{Scope, Stream}; +use timely::progress::frontier::AntichainRef; +use timely::progress::timestamp::Timestamp as TimelyTimestamp; +use timely::progress::Antichain; +use timely::PartialOrder; use dataflow_types::{SinkAsOf, TailSinkConnector}; use expr::GlobalId; @@ -31,6 +36,11 @@ pub fn tail( { let mut errored = false; let mut packer = Row::default(); + let mut received_data = false; + + // Initialize to the minimal input frontier. + let mut input_frontier = Antichain::from_elem(::minimum()); + stream.sink(Pipeline, &format!("tail-{}", id), move |input| { input.for_each(|_, batches| { if errored { @@ -38,6 +48,7 @@ pub fn tail( // receiver has gone away. return; } + received_data = true; let mut results = vec![]; for batch in batches.iter() { let mut cursor = batch.cursor(); @@ -71,19 +82,17 @@ pub fn tail( results.sort_by_key(|(time, _)| *time); let mut results: Vec = results.into_iter().map(|(_, row)| row).collect(); - if connector.emit_progress { - if let Some(upper) = batch_upper(batches.last()) { - // The user has requested progress messages and there's at least one - // batch. All of the batches might have zero rows, so we do not depend on - // results at all. Another benefit of using upper (instead of the largest row - // time) is that the batch's upper may be larger than the row time. - packer.push(Datum::Decimal(Significand::new(i128::from(upper)))); - packer.push(Datum::True); - // Fill in the diff column and all table columns with NULL. - for _ in 0..(connector.object_columns + 1) { - packer.push(Datum::Null); + if let Some(batch) = batches.last() { + let progress_row = update_progress( + &mut input_frontier, + batch.desc.upper().borrow(), + &mut packer, + connector.object_columns + 1, + ); + if connector.emit_progress { + if let Some(progress_row) = progress_row { + results.push(progress_row); } - results.push(packer.finish_and_reuse()); } } @@ -94,14 +103,74 @@ pub fn tail( return; } }); + + let progress_row = update_progress( + &mut input_frontier, + input.frontier().frontier(), + &mut packer, + connector.object_columns + 1, + ); + + // Only emit updates if this operator/worker received actual + // data for emission. For TAIL, data is exchanged to one worker, + // which forwards all the data to the client process. If we + // blindly forwarded the frontier from all workers we would get + // multiple progress updates in the client. + if connector.emit_progress && received_data { + if let Some(progress_row) = progress_row { + let results = vec![progress_row]; + + // TODO(benesch): the lack of backpressure here can result in + // unbounded memory usage. + if connector.tx.send(results).is_err() { + errored = true; + return; + } + } + } }) } -fn batch_upper( - batch: Option<&Rc>>, -) -> Option { - batch - .map(|b| b.desc.upper().elements().get(0)) - .flatten() - .copied() +// Checks if there is progress between `current_input_frontier` and +// `new_input_frontier`. If yes, updates the `current_input_frontier` to +// `new_input_frontier` and returns a [`Row`] that encodes this progress. If +// there is no progress, returns [`None`]. +fn update_progress( + current_input_frontier: &mut Antichain, + new_input_frontier: AntichainRef, + packer: &mut Row, + empty_columns: usize, +) -> Option { + // Test to see if strict progress has occurred. This is true if the new + // frontier is not less or equal to the old frontier. + let progress = !PartialOrder::less_equal(&new_input_frontier, ¤t_input_frontier.borrow()); + + if progress { + current_input_frontier.clear(); + current_input_frontier.extend(new_input_frontier.iter().cloned()); + + // This only looks at the first entry of the antichain. + // If we ever have multi-dimensional time, this is not correct + // anymore. There might not even be progress in the first dimension. + // We panic, so that future developers introducing multi-dimensional + // time in Materialize will notice. + let upper = new_input_frontier + .iter() + .at_most_one() + .expect("more than one element in the frontier") + .cloned(); + + // the input frontier might be empty + upper.map(|upper| { + packer.push(Datum::Decimal(Significand::new(i128::from(upper)))); + packer.push(Datum::True); + // Fill in the diff column and all table columns with NULL. + for _ in 0..empty_columns { + packer.push(Datum::Null); + } + packer.finish_and_reuse() + }) + } else { + None + } } diff --git a/src/materialized/tests/sql.rs b/src/materialized/tests/sql.rs index ac39650038894..f6e977ec60045 100644 --- a/src/materialized/tests/sql.rs +++ b/src/materialized/tests/sql.rs @@ -28,6 +28,7 @@ use std::time::{Duration, Instant}; use chrono::{DateTime, Utc}; use log::info; +use postgres::Row; use tempfile::NamedTempFile; use util::{MzTimestamp, PostgresErrorExt, KAFKA_ADDRS}; @@ -261,12 +262,35 @@ fn test_tail_progress() -> Result<(), Box> { for i in 1..=3 { let data = format!("line {}", i); client_writes.execute("INSERT INTO t1 VALUES ($1)", &[&data])?; - match client_reads.query("FETCH ALL c1", &[])?.as_slice() { - [data_row, progress_row] => { - assert_eq!(data_row.get::<_, bool>("mz_progressed"), false); - assert_eq!(data_row.get::<_, i64>("mz_diff"), 1); - assert_eq!(data_row.get::<_, String>("data"), data); + // We have to try several times. It might be that the FETCH gets + // a batch that only contains continuous progress statements, without + // any data. We retry until we get the batch that has the data, and + // then verify that it also has a progress statement. + loop { + let rows = client_reads.query("FETCH ALL c1", &[])?; + + let rows = rows.iter(); + + // find the data row in the sea of progress rows + + // remove progress statements that occured before our data + let mut rows = rows.skip_while(|row| row.try_get::<_, String>("data").is_err()); + + // this must be the data row + let data_row = rows.next(); + + let data_row = match data_row { + Some(data_row) => data_row, + None => continue, //retry + }; + + assert_eq!(data_row.get::<_, bool>("mz_progressed"), false); + assert_eq!(data_row.get::<_, i64>("mz_diff"), 1); + assert_eq!(data_row.get::<_, String>("data"), data); + + let mut num_progress_rows = 0; + for progress_row in rows { assert_eq!(progress_row.get::<_, bool>("mz_progressed"), true); assert_eq!(progress_row.get::<_, Option>("mz_diff"), None); assert_eq!(progress_row.get::<_, Option>("data"), None); @@ -274,28 +298,115 @@ fn test_tail_progress() -> Result<(), Box> { let data_ts: MzTimestamp = data_row.get("mz_timestamp"); let progress_ts: MzTimestamp = progress_row.get("mz_timestamp"); assert!(data_ts < progress_ts); + + num_progress_rows += 1; } - _ => panic!("wrong number of rows returned"), + + assert!(num_progress_rows > 0); + + // success! break out of the loop + break; } } - // Test that tailing non-nullable columns with progress information - // turn sthem into nullable columns. See #6304. - { - client_writes.batch_execute("CREATE TABLE t2 (data text NOT NULL)")?; - client_writes.batch_execute("INSERT INTO t2 VALUES ('data')")?; - client_reads.batch_execute( - "COMMIT; BEGIN; + Ok(()) +} + +// Verifies that tailing non-nullable columns with progress information +// turns them into nullable columns. See #6304. +#[test] +fn test_tail_progress_non_nullable_columns() -> Result<(), Box> { + ore::test::init_logging(); + + let config = util::Config::default().workers(2); + let server = util::start_server(config)?; + let mut client_writes = server.connect(postgres::NoTls)?; + let mut client_reads = server.connect(postgres::NoTls)?; + + client_writes.batch_execute("CREATE TABLE t2 (data text NOT NULL)")?; + client_writes.batch_execute("INSERT INTO t2 VALUES ('data')")?; + client_reads.batch_execute( + "COMMIT; BEGIN; DECLARE c2 CURSOR FOR TAIL t2 WITH (PROGRESS);", - )?; - let data_row = client_reads.query_one("FETCH 1 c2", &[])?; - assert_eq!(data_row.get::<_, bool>("mz_progressed"), false); - assert_eq!(data_row.get::<_, i64>("mz_diff"), 1); - assert_eq!(data_row.get::<_, String>("data"), "data"); - let progress_row = client_reads.query_one("FETCH 1 c2", &[])?; - assert_eq!(progress_row.get::<_, bool>("mz_progressed"), true); - assert_eq!(progress_row.get::<_, Option>("mz_diff"), None); - assert_eq!(progress_row.get::<_, Option>("data"), None); + )?; + let data_row = client_reads.query_one("FETCH 1 c2", &[])?; + assert_eq!(data_row.get::<_, bool>("mz_progressed"), false); + assert_eq!(data_row.get::<_, i64>("mz_diff"), 1); + assert_eq!(data_row.get::<_, String>("data"), "data"); + let progress_row = client_reads.query_one("FETCH 1 c2", &[])?; + assert_eq!(progress_row.get::<_, bool>("mz_progressed"), true); + assert_eq!(progress_row.get::<_, Option>("mz_diff"), None); + assert_eq!(progress_row.get::<_, Option>("data"), None); + + Ok(()) +} + +/// Verifies that we get continuous progress messages as soon as at least one +/// data message was produced. +#[test] +fn test_tail_continuous_progress() -> Result<(), Box> { + ore::test::init_logging(); + + let config = util::Config::default().workers(2); + let server = util::start_server(config)?; + let mut client_writes = server.connect(postgres::NoTls)?; + let mut client_reads = server.connect(postgres::NoTls)?; + + client_writes.batch_execute("CREATE TABLE t1 (data text)")?; + client_reads.batch_execute( + "COMMIT; BEGIN; + DECLARE c1 CURSOR FOR TAIL t1 WITH (PROGRESS);", + )?; + + client_writes.execute("INSERT INTO t1 VALUES ($1)", &[&"hello".to_owned()])?; + + let mut last_ts = MzTimestamp(u64::MIN); + let mut verify_rows = move |rows: Vec| -> (usize, usize) { + let mut num_data_rows = 0; + let mut num_progress_rows = 0; + + for row in rows { + let diff = row.get::<_, Option>("mz_diff"); + match diff { + Some(diff) => { + num_data_rows += 1; + + assert_eq!(diff, 1); + assert_eq!(row.get::<_, bool>("mz_progressed"), false); + let data = row.get::<_, Option>("data"); + assert!(data.is_some()); + } + None => { + num_progress_rows += 1; + + assert_eq!(row.get::<_, bool>("mz_progressed"), true); + assert_eq!(row.get::<_, Option>("data"), None); + } + } + + let ts: MzTimestamp = row.get("mz_timestamp"); + assert!(last_ts < ts); + last_ts = ts; + } + + (num_data_rows, num_progress_rows) + }; + + // this will fetch away the initial batch that contains our data message + // plus some progress messages + let rows = client_reads.query("FETCH ALL c1", &[])?; + let (num_data_rows, num_progress_rows) = verify_rows(rows); + assert_eq!(num_data_rows, 1); + assert!(num_progress_rows > 0); + + // Try and read some progress messages. The normal update interval is + // 1s, so only wait for two updates. Otherwise this would run for too long. + for _i in 1..=2 { + let rows = client_reads.query("FETCH ALL c1", &[])?; + + let (num_data_rows, num_progress_rows) = verify_rows(rows); + assert_eq!(num_data_rows, 0); + assert!(num_progress_rows > 0); } Ok(()) diff --git a/test/testdrive/fetch-tail-continuous-progress.td b/test/testdrive/fetch-tail-continuous-progress.td new file mode 100644 index 0000000000000..e3d7b6e9cfafa --- /dev/null +++ b/test/testdrive/fetch-tail-continuous-progress.td @@ -0,0 +1,34 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# +# Make sure that TAIL WITH (progress=true) emits periodic progress +# messages even if there's no new data. +# + +$ set-regex match=\d{13} replacement= + +> CREATE TABLE t1 (f1 INTEGER); + +> INSERT INTO t1 VALUES (123); + +> BEGIN + +> DECLARE c CURSOR FOR TAIL t1 WITH (progress=true); + +# Verify there is a progress row available in the first batch. +> FETCH 2 c + false 1 123 + true + +# Now ask (and possibly wait) for another progress row. +> FETCH 1 c + true + +> COMMIT