Skip to content

persist,dataflow: verify expected as_of when replaying streams#9659

Merged
aljoscha merged 3 commits into
MaterializeInc:mainfrom
aljoscha:is-8608-verify-as-of-when-rendering-persist-sources
Jan 11, 2022
Merged

persist,dataflow: verify expected as_of when replaying streams#9659
aljoscha merged 3 commits into
MaterializeInc:mainfrom
aljoscha:is-8608-verify-as-of-when-rendering-persist-sources

Conversation

@aljoscha
Copy link
Copy Markdown
Contributor

@aljoscha aljoscha commented Dec 17, 2021

Before, it could happen that a dataflow was being rendered at an
expected as_of but the persistent streams that were replayed as part
of the dataflow had compacted beyond that. This would manifest in subtle
ways, see for example:
https://github.com/MaterializeInc/database-issues/issues/2634.

Now, we thread through the expected as_of_frontier and give a
meaningful error message when the expected as_of cannot be served.

Note: it's not possible to provoke this new error message from tests. It
will only surface if/when we have a bug in the logic that determines the
as_of frontier (on the coordinator) and forwards it to dataflow
rendering.

Fixes MaterializeInc/database-issues#2635

@danhhz This will have conflicts with #9615.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. NO: See note above.
  • [N/A] This PR adds a release note for any user-facing behavior changes.

This change is Reviewable

@aljoscha aljoscha requested a review from danhhz December 17, 2021 14:25
@aljoscha aljoscha force-pushed the is-8608-verify-as-of-when-rendering-persist-sources branch from 2172325 to 0d0ae6f Compare December 17, 2021 14:47
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to deal with the merge conflicts. I was holding that PR until our dec benchmarking stuff is done, anyway, so I'll also make sure to hold it for this.

Ok(snapshot) => {
let snapshot_since = snapshot.since();

if PartialOrder::less_than(&as_of_frontier, &snapshot_since) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh interesting, i was imagining that we'd have to thread this into the snapshot call. i don't see why this wouldn't work though

fn replay(
&self,
snapshot: Result<DecodedSnapshot<K, V>, Error>,
as_of_frontier: &Antichain<u64>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this mean the contract of the operator needs to be that it forwards all data to this frontier before emitting it? I don't see where that happens. we should also add a test for this

same q for the listen_source operator.

the persisted_source operator shouldn't need to do any forwarding itself, but probably wants a debug assert (and maybe also a test?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not 100% clear to me wether it needs to do that. We have code in the source rendering code that does that:

// Apply `as_of` to each timestamp.
. Maybe @frankmcsherry has an opinion on this: should the replay source advance times on the data it emits to the as_of, or should it just verify that its since/compaction frontier is not beyond the as_of and rely on the rest of the source pipeline to advance the timestamp?

I'll add the tests for sure, though!

@aljoscha
Copy link
Copy Markdown
Contributor Author

@frankmcsherry
Copy link
Copy Markdown
Contributor

should the replay source advance times on the data it emits to the as_of, or should it just verify that its since/compaction frontier is not beyond the as_of and rely on the rest of the source pipeline to advance the timestamp?

I think either is fine. I anticipate the dataflow layer not assuming that things have been correctly advanced, just for sanity, but it is an opportunity to push more work down to storage which could benefit it (e.g. in some world, it could reduce the data transited).

Ideally platform will soon start us on the path of discussing the interface, and what things COMPUTE knows that it can tell STORAGE to allow it to push more stuff down. E.g. MFPs, and snapshot/no, tail/no. What worked great for MFPs so far was an API that allowed the callee to extract any of the constraints to indicate that they will accept responsibility for them (e.g. peel off parts of a MFP, leaving the remaining work in the MFP). If we could do the same with the other arguments, it might make for a fine negotiation protocol, in which the default implementation does not remove anything, until it reaches a point where it is confident that it can guarantee the work is done.

@aljoscha aljoscha force-pushed the is-8608-verify-as-of-when-rendering-persist-sources branch from 4bc531f to 714b09e Compare December 20, 2021 15:12
@aljoscha
Copy link
Copy Markdown
Contributor Author

@dan i added:

@aljoscha aljoscha force-pushed the is-8608-verify-as-of-when-rendering-persist-sources branch from 714b09e to 10b3a4d Compare December 20, 2021 15:16
@aljoscha aljoscha requested a review from ruchirK December 20, 2021 15:51
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! though I didn't look at the coord changes closely

@aljoscha aljoscha force-pushed the is-8608-verify-as-of-when-rendering-persist-sources branch from 10b3a4d to 53ed22a Compare December 20, 2021 16:06
@ruchirK
Copy link
Copy Markdown
Contributor

ruchirK commented Dec 20, 2021

I'm just seeing this now - will look at this after lunch

Comment thread src/coord/src/coord.rs

// NOTE: Tables are not sources, but to a large part of the system they look
// like they are, e.g. they are rendered as a SourceConnector::Local.
self.sources.insert(entry.id(), frontiers);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that update_upper has been getting messages for both a table and its index, but has been ignoring the non-index message? Or does the since setting happen only once here? Can you explain a bit more (or add a test) that illustrates the problem this is solving?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matt: My understanding is that:

  • Right now only the table's primary index sends back upper information. We use that information to derive a since for both that index, and the raw persisted data (which we set in persisted_table_allow_compaction to allow the persisted data to also compact up to that since)

  • The persisted data is compacted by a separate thread, and on restart is only valid up to some since.

  • On restart, we reload the table's primary index with the persisted data, but we don't set its since to reflect that the data is only valid after some times.

@aljoscha I'm not sure that line 499 is doing anything, because unlike a regular source i don't think you can actually use the table the way you can use a unmaterialized source (for now at least). I believe this instead needs to set the since of the table's primary index. I don't have high confidence in this analysis yet though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's useful context. I agree with your recommendation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjibson There are existing tests for this, basically all tests under test/persistence/user-tables (see https://github.com/MaterializeInc/materialize/tree/main/test/persistence/user-tables). The idea of the tests is that they do a thing, then materialized is stopped and restarted and then the after tests are run to verify.

Persistent user tables are like sources in a way, a source that reads from persistence is created to read from the underlying persistent stream. What I did, is change the API to force dataflow to hand the expected as_of_frontier to the replay() operator (this is the thing that replays persisted data), and to verify that the since (aka compaction frontier) of the persisted stream is compatible with the requested as_of_frontier. This failed all the tests, because we're not correctly calculating an as_of_frontier, because we're not taking the since of the persisted stream into account. The first commit in this PR fixes that, to make the tests pass again.

@ruchirK It is a valid place to put it: with that line, we correctly determine a since and tests pass, without it they don't.

Here is where the since that we put in when bootstrapping is picked up:

for (source_id, _description) in dataflow.source_imports.iter() {
And this is where the frontier for the index that is rendered for the table is registered:
self.indexes.insert(*global_id, frontiers);

The workflow (when restarting is roughly this):

  1. coordinator bootstraps its metadata, which now includes reading the persistence since for tables and registering it with the source frontiers
  2. coordinator notices that it needs to render a dataflow for the table
  3. the code I linked to above is consulted to determine a since
  4. the index frontier of the arrangement that is about to be rendered is registered
  5. the "source" is being rendered.

So, as I mentioned above, to a lot of the system the table looks like a source. Not sure that's good, though... 😅

Copy link
Copy Markdown
Contributor

@ruchirK ruchirK Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. This construction is a little bit nonintuitive to me though because we use the table's since on startup to initialize the primary index's since and then never update it again. That happens to work implicitly because we know once we've rendered the primary index, we'll never use the table's since again, but it feels very action-at a distance.

It seems like it is more work to directly initialize the index's since frontier, so I'm fine with this, but perhaps would you consider also updating the table source's since frontier when we allow compaction on the index/persisted data and setting the table source's since frontier to zero when we first create it? The way the code is structured now, we only keep an entry in self.sources for a table id after a restart which i suspect is correct right now, but might lead to surprising effects in the future

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do that initially because of the weird nature of Table "sources", but I will do that! I think it might even fix a bug with disabling/enabling user-indexes. I will have to pick that up again in the new year, though.

@aljoscha aljoscha force-pushed the is-8608-verify-as-of-when-rendering-persist-sources branch from 53ed22a to 57c7285 Compare December 21, 2021 16:38
@aljoscha
Copy link
Copy Markdown
Contributor Author

aljoscha commented Dec 21, 2021

@danhhz I had to revert the change that forwards the timestamps of replayed records by the as_of_frontier. There are cases where the as_of_frontier is beyond both the since (aka compaction frontier) and upper (seal ts) of the persisted stream. This is fine, but there is logic in the source (the graph of operators that make up the source) that filters out records based on their timestamp, and this expects records to have the timestamps as they are in the stream. Forwarding to the as_of_frontier in the replay() operator then means that the rest of the source thinks they should be filtered out, based on the upper seal timestamp.

One case where this happens is when restarting with --disabled-user-indexes. In that case, we create entries in the catalog for the source, but don't render it. However, there is logic that will let the "internal" (to the coordinator) since advance and when we then do enable the user index again the source will be rendered with an as_of_frontier that is beyond the upper seal timestamp. I found this out because there is a test for this scenario! 🎉 (thanks @philip-stoev!).

To me, this indicates that we need to forward the timestamp only at the boundary of the source, that is internally we work with the timestamps we get from persistence and then forward at the boundary to the rest of the dataflow. This is what we're currently doing (before this change). So this change would only check that the expected as_of_frontier is valid given the since of the persisted stream(s).

((I can elaborate in a call if needed.))

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Dec 21, 2021

From persist's correctness perspective, I think the since forwarding that we do in ArrangementSnapshotIter is sufficient. I can't say I understand all that context, so I'll definitely take you up on that call, but it'll probably be after the holidays and I trust your decision here so don't block merging this on that call. :)

@frankmcsherry
Copy link
Copy Markdown
Contributor

This is fine, but there is logic in the sink (the graph of operators that make up the sink) that filters out records based on their timestamp, and this expects records to have the timestamps as they are in the stream. Forwarding to the as_of_frontier in the replay() operator then means that the rest of the sink thinks they should be filtered out, based on the upper seal timestamp.

It sounds like there may be a bug to shake out here about what sinks assume about the data shot at them? It doesn't seem like this is a persistence glitch, even if undoing it prevents naughty behavior elsewhere. I'm interested .. either the call or reading about your conclusions. If we need to file something about sinks, or about getting a better as_of for dataflows that build sinks, I .. 1. wouldn't be surprised and 2. am happy to provide opinions!

@aljoscha
Copy link
Copy Markdown
Contributor Author

Oh boy 🙈 , I had a very important typo in there: when I wrote sink, that should have been source. I'm editing the original for anyone who wants to re-read.

@aljoscha aljoscha force-pushed the is-8608-verify-as-of-when-rendering-persist-sources branch 2 times, most recently from a8b3c50 to f4cb6e7 Compare January 5, 2022 14:30
@aljoscha
Copy link
Copy Markdown
Contributor Author

aljoscha commented Jan 5, 2022

@ruchirK I updated this with what we talked about last year. Could you please take another look?

We're now maintaining a since frontier for the persistence source, even though no-one will currently look at that. It feels a bit more correct to do it, though.

@aljoscha
Copy link
Copy Markdown
Contributor Author

@ruchirK Gentlest of pings. 😅 (I know you've been OoO, just making sure you're aware, because it blocks some things.)

@ruchirK
Copy link
Copy Markdown
Contributor

ruchirK commented Jan 10, 2022

reviewing this one now!

Copy link
Copy Markdown
Contributor

@ruchirK ruchirK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked through the coordinator bits and this all seemed reasonable to me. I left a few small comments.

I'm trying to reason about "are there any other use cases that might be affected by this change" (e.g. exactly once sinks?) and it seems to me that the answer is no, because ... the table always has an undeleteable primary index, and so any query / view definition / sink definition done on the table uses the upper, since of its primary index. is that rationale correct?

Comment thread src/coord/src/coord.rs
if let Some(df) = df {
let since_ts = {
match &table.persist {
Some(persist) => Some(persist.since_ts),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have not yet read where persist.since_ts gets set, but why would this be nonzero when creating a new table?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read it, and it still seems like new tables should get a since of 0 (because new tables will be newly persisted collections?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the case where coordinator bootstraps from scratch but we still have the persist data directory. Probably not very likely today, but maybe more likely when persistence stores to S3? I'm happy to simplify this to just always initialize to [0], though.

Comment thread src/coord/src/coord.rs
/// TODO: In the future the coordinator should perhaps track a table's upper and
/// since frontiers directly as it currently does for sources.
fn persisted_table_allow_compaction(&self, since_updates: &[(GlobalId, Antichain<Timestamp>)]) {
fn persisted_table_allow_compaction(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the todo above this line now stale?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe not? But I also don't know exactly what the comment was aiming at. As it is, we still need this specialized code for tracking the sinces of the backing source, since they are not covered yet by the regular tracking logic. We still have the situation that tables sources always have an index stuck to them, and there can never be a bare "table source".

Before, it could happen that a dataflow was being rendered at an
expected `as_of` but the persistent streams that were replayed as part
of the dataflow had compacted beyond that. This would manifest in subtle
ways, see for example:
https://github.com/MaterializeInc/materialize/issues/8606.

Now, we thread through the expected `as_of_frontier` and give a
meaningful error message when the expected `as_of` cannot be served.

Note: it's not possible to provoke this new error message from tests. It
will only surface if/when we have a bug in the logic that determines the
as_of frontier (on the coordinator) and forwards it to dataflow
rendering.

Fixes #8608
@aljoscha aljoscha force-pushed the is-8608-verify-as-of-when-rendering-persist-sources branch from f4cb6e7 to ad3524f Compare January 11, 2022 09:42
@aljoscha aljoscha merged commit df96e2d into MaterializeInc:main Jan 11, 2022
@aljoscha aljoscha deleted the is-8608-verify-as-of-when-rendering-persist-sources branch January 11, 2022 10:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants