Skip to content

persist: relax allow_compaction() requirements#9938

Merged
aljoscha merged 1 commit into
MaterializeInc:mainfrom
aljoscha:persist-relax-allow-compaction
Jan 13, 2022
Merged

persist: relax allow_compaction() requirements#9938
aljoscha merged 1 commit into
MaterializeInc:mainfrom
aljoscha:persist-relax-allow-compaction

Conversation

@aljoscha
Copy link
Copy Markdown
Contributor

@aljoscha aljoscha commented Jan 7, 2022

Before, we would not allow allowing compaction beyond the seal frontier
(aka upper). Differential dataflow allows this, and it will make
enabling compaction for persistent sources easier if we do as well.

This requires changing the nemesis tests, because requests that would
previously fail now pass.

This also requires updating the golden test. The reason being that the
(deterministic) generator that the golden test uses generates some
allow_compaction requests that fail and some that pass, and the code
that applies those ignores requests that fail. After this change, those
requests all succeed, meaning the state is different.

Tips for reviewer

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR adds a release note for any user-facing behavior changes.

This change is Reviewable

@aljoscha aljoscha requested a review from danhhz January 7, 2022 17:53
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks exactly like I expect, but I'd like a few more things added to the PR please

  • since the new semantics are a bit of a mindbend at first (at least they were to me), docs on StreamWriteHandle::allow_compaction to call out that it's legal to forward the compaction frontier even past since and some attempt at an intuition of what this means
  • unit test coverage at both the ArrangementSnapshot level and the Replay operator level of exercising a read with since in advance of upper
  • the big one: I'm reasonably sure that this will make the nemesis tests flaky under stress (I was also working on this PR when I saw yours and mine made nemesis flaky :D). I looked into it a bit and my intuition was that we'd have to do the TODO in the nemesis validator code about being more disciplined with as_of to fix it in any reasonable way. feel free to poke at it a bit for an hour or two if you like, it'll probably be instructive, but don't spend too much time on it.. i'll think about it more/take a swing at fixing it myself early next week

assert_eq!(t.validate_allow_compaction(&Antichain::from_elem(5)),
Err("invalid compaction less than trace since Antichain { elements: [6] }: Antichain { elements: [5] }".into()));

// Advance since frontier to seal
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep these tests but now verify that they succeed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

};
assert_eq!(b.validate(), Ok(()));

// Trace since at nonzero trace seal
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep these tests but now verify that they succeed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@aljoscha
Copy link
Copy Markdown
Contributor Author

aljoscha commented Jan 10, 2022

My PR for verifying the as_of/since (#9659) fixes the flakiness in the nemesis tests. But I don't think it fixes it the "right" way. With that PR on top of this, the persisted_source() call in nemesis/direct.rs now has an Antichain::from_elem(0) argument and will emit an Err when it cannot satisfy that. I have a feeling this is not correct and we should instead calculate the correct as_of to pass in. I'm a bit surprised that the tests pass, though, with this, and become un-flaky again. Working on it...

I don't know enough about the nemesis tests, but my working assumption is that someone notices the Err from the persisted source and we treat that as an okay thing to happen, and don't fail on it. And when we have that error, we also don't expect any records to come out from it (which is correct, because none will come).

@aljoscha
Copy link
Copy Markdown
Contributor Author

Another update: when I change nemesis/direct.rs to determine the correct compaction frontier and pass it to persistent_source(), we're back to being flaky.

@ruchirK ruchirK self-requested a review January 10, 2022 14:11
@aljoscha
Copy link
Copy Markdown
Contributor Author

I pushed what I have but the nemesis tests are flaky with this.

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Jan 10, 2022

Innnnteresting.... just looked at that seems like a totally reasonable fix. I'll check out this branch locally and poke at it

@aljoscha
Copy link
Copy Markdown
Contributor Author

The problem is concurrency (of course! ... 😅). When I change DirectCore::stream() to this (see NEW STUFF):

fn stream(&mut self, name: &str) -> Result<Ingest, Error> {
    match self.streams.entry(name.to_owned()) {
        Entry::Occupied(x) => {
            let (ingest, _) = x.get();
            Ok(ingest.clone())
        }
        Entry::Vacant(x) => {
            // NEW STUFF
            let description = self.runtime.get_description(name);
            let compaction_since = match description {
                Ok(description) => description.since().clone(),
                Err(Error::UnknownRegistration(_)) => {
                    // Stream has not yet been created.
                    Antichain::from_elem(0)
                }
                Err(e) => return Err(e),
            };
            // NEW STUFF END

            let (write, read) = self.runtime.create_or_load(name);

            let stream_id = write.stream_id()?;
            let dataflow_read = read.clone();
            let (output_tx, output_rx) = mpsc::channel();
            let output_tx = Arc::new(Mutex::new(output_tx));

            let (progress_tx, progress_rx) = DataflowProgress::new();
            let progress_handle = progress_tx.clone();
            let workers = timely::execute(
                timely::Config::process(NUM_DATAFLOW_WORKER_THREADS),
                move |worker| {
                    let dataflow_read = dataflow_read.clone();
                    let mut probe = ProbeHandle::new();
                    worker.dataflow(|scope| {
                        let output_tx = output_tx
                            .lock()
                            .expect("clone doesn't panic and poison lock")
                            .clone();
                        let data =
                            scope.persisted_source(dataflow_read.clone(), &compaction_since);
                        data.probe_with(&mut probe).capture_into(output_tx);
                        
                        // NEW STUFF
                        match dataflow_read.snapshot() {
                            Ok(snapshot) => {
                                let snapshot_since = snapshot.since();
                                if compaction_since != snapshot_since {
                                    panic!("Mismatch, snapshot_since {:?} != compaction_since {:?}",snapshot_since, compaction_since);
                                }
                            }
                            _ => (),
                        }
                        // NEW STUFF END
                    });
                    while worker.step_or_park(None) {
                        probe.with_frontier(|frontier| {
                            progress_tx.maybe_progress(frontier);
                        })
                    }
                    progress_tx.close();
                },
            )?;
            let input = Ingest {
                write,
                read,
                stream_id,
                progress_rx,
            };
            let output = Dataflow {
                workers: TimelyWorkers(workers),
                output: output_rx,
                progress_tx: progress_handle,
            };
            x.insert((input.clone(), output));
            Ok(input)
        }
    }
}

The check I've added will fail sometimes. The solution seems to be, that DirectCore has to keep track of what the latest successful allow_compaction() call was (per stream) and use that when creating the persistent_source(). I don't know if that fits the current design of the tests, though.

@aljoscha
Copy link
Copy Markdown
Contributor Author

Scratch that. Even this wouldn't work because we can have concurrent calls to allow_compaction() that would still mess with us.

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Jan 10, 2022

There's another problem too, and I think this one may be worse :-O. Even if we make that work, I'm not entirely sure how to write the verifier piece.

So, assume for a sec that we somehow plumb the as_of that gets selected for dataflow construction to the verifier and then, other than that, totally ignore allow_compactions. The verifier is given an ordered history of write and seal commands and also some prefix of the listener output. The writes in the listener output are all in the same order as the writes in the command history and the seals in the listener output are all in the same order as the seals in the command history, but, some of the writes may have been reordered in front of the seals wrt the original command history order (because of the way command batching and our listener impls work). (side note: I think this is fine/desired, we could change the impl of listeners to guarantee the order of writes and seals wrt each other, but I think holding that strict of an invariant is a non-goal since the only thing we really care about is differential dataflow semantics and we currently do that: i.e. never reorder a write vs a seal that would change its validity).

Note that because of the above, there isn't always a specific SeqNo that corresponds to the given point we're validating in listener output. The way the validator code gets around this now is to identify the latest seal in the listener output and throw away all data from both our command history and the listener output that's >= the seal. Because we respect differential dataflow semantics, these are guaranteed to match. The problem now is that if we have an as_of that's > the latest seal (even if as I mention above it's something we've plumbed concretely from dataflow construction to the validator somehow), then we can't do this filtering since we've lost info in the listener output on what was originally before vs after the seal.

So the real problem here is that we have two ordering systems (SeqNos and differential timestamps) and neither of them can be used to perfectly identify a subset of actual vs expected that should match.

One "nice" thing here is that I was also hitting this issue as I added more and more concurrency coverage to the validator test and stopped at some point because it was breaking my brain. It's a thorny problem but it's now important to fix for two reasons.

Concretely, I think what you should do here to land your PR is just skip validation (early return in step_read_output) when the latest allow_compaction for the stream in the command history is >= the latest seal for the stream in the command history and toss in a TODO to fix this all up. There's already another place or two in nemesis that we do the same thing. We just need to not lost track of eventually fixing this up since nemesis is such an important test.

(So far, the only idea I've had for the real fix is to (a) find which write SeqNo the listener output corresponds to by running this check one by one and erroring if it never finds anything that matches and (b) asserting that this SeqNo never regresses. Naively, this is quadratic, which we definitely don't want for nemesis, but atm it's the only idea I have. Maybe we can be clever with the impl to avoid that? Also, now that I type this up, I think maybe even this doesn't work since there will be traffic patterns that make things ambiguous e.g. repeatedly adding a row and then retracting it.)

@aljoscha aljoscha force-pushed the persist-relax-allow-compaction branch from d86152b to 2e65894 Compare January 10, 2022 20:27
@aljoscha
Copy link
Copy Markdown
Contributor Author

I updated it to something much less flaky, by excluding cases where since >= seal, and by locking on DirectCore while timely workers have not yet created their persistent_source(). I'll stop for now, but I'll pick it up again tomorrow. Maybe you can spot something I didn't, if you have time.

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Jan 11, 2022

Okay, so I think what was happening was that your "skip validation" check was based off of the latest seal and allow_compactions that were sent, but we need to instead base it on the latest seal received. Looks like I said this wrong in my comment above, sorry! I've reverted all of your nemesis changes (just to make sure I understood what was going on) and applied the following diff and it seems to be working.

diff --git a/src/persist/src/nemesis/validator.rs b/src/persist/src/nemesis/validator.rs
index 3f0e7e692..2a071e72a 100644
--- a/src/persist/src/nemesis/validator.rs
+++ b/src/persist/src/nemesis/validator.rs
@@ -266,6 +266,15 @@ impl Validator {
                     .unwrap_or_default(),
             );
 
+            if !as_of.less_than(&latest_seal) {
+                // TODO: We cannot currently verify cases where the compaction
+                // frontier is beyond the since, because we cannot determine
+                // anymore (based on the latest seal timestamp) which records
+                // (both from the expected writes and the writes we get from
+                // timely) are eligible for verification.
+                return;
+            }
+
             // Verify that the output contains all sent writes less than the
             // latest seal it contains.
             //

(So far, the only idea I've had for the real fix is to (a) find which write SeqNo the listener output corresponds to by running this check one by one and erroring if it never finds anything that matches and (b) asserting that this SeqNo never regresses. Naively, this is quadratic, which we definitely don't want for nemesis, but atm it's the only idea I have. Maybe we can be clever with the impl to avoid that? Also, now that I type this up, I think maybe even this doesn't work since there will be traffic patterns that make things ambiguous e.g. repeatedly adding a row and then retracting it.)

I've been thinking more about this and I'm starting to believe that the only way we'll be able to make this in any way coherent (and performant) is if we plumb SeqNos into the output of the PersistedSource operator. We don't want them for non-nemesis uses of it, so I'm hoping there's some nice way to do it optionally, but I haven't tried anything here yet.

@aljoscha
Copy link
Copy Markdown
Contributor Author

What does this pass as an as_of to persisted_source() in nemesis/direct.rs? If it is Antichain::from_elem(0), then that's excluding cases where we allow compaction from validation. The reason is that passing an as_of of [0] will make persisted_source() emit an error. And when the validator sees any errors in the output it exits early.

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Jan 11, 2022

But we don't have to solve that to unblock merging this PR right?

@aljoscha
Copy link
Copy Markdown
Contributor Author

We don't, but I'm afraid we're silently reducing what we cover with the test. I'll timebox it to end of today and see were I get, okay? I have other PRs open anyways.

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Jan 11, 2022

Works for me! It'll be good to increase the bus factor here. I just didn't want to get personally sucked into nemesis today :)

@aljoscha aljoscha force-pushed the persist-relax-allow-compaction branch from 2e65894 to c72e2c5 Compare January 11, 2022 18:39
@aljoscha
Copy link
Copy Markdown
Contributor Author

I believe I figured it out, part of it was just me not understanding completely how validation works. I was bailing from step_read_output() to early, which meant that the stored prefix was not being updated.

The commit that hardens seal forwarding is not strictly necessary, but I thought it would be more correct to do it like this, just from looking at the code.

Also, locking on DirectCore while creating the persisted_source() workers also isn't strictly necessary. It's just that not doing it will make it way more likely that the source is not constructed correctly, and we therefore bypass the validation step because the source emits an error. This might even be desirable, idk. 🤷‍♂️

Please don't feel like you need to review right away, I'm not blocked hard by this: my PR that enables compaction for sources can work without this, it just means that there will be some superfluous error messages in the log.

@aljoscha
Copy link
Copy Markdown
Contributor Author

Turns out this change is not necessary for #9981. For the same reason why we cannot (currently) validate nemesis steps where compaction is beyond upper (beyond meaning >=) we cannot restore from persisted data and timestamp bindings where the compaction is beyond upper. As in the nemesis test, we wouldn't be able to determine which updates/bindings are valid, i.e. below the common seal timestamp.

We can still merge this PR, because it might make sense to not have this restriction in the future (and DD doesn't have it). But it might be less work to just drop it for now. WDYT?

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Jan 12, 2022

I'll leave the final decision up to you, but I lean toward merging this. All else being equal, persist follows differential's lead on semantics and this is basically ready to go, mod I think just the rustdoc request I made. I would, however, take most of the nemesis changes out of this PR, leaving just the allow_compaction in generator and the one step_read_output change in validator to just skip validation when since is >= seal. I might be missing something, but that doesn't reduce any nemesis coverage we already had and I'm comfortable in the short term with not having nemesis coverage of the since >= seal case given all the other unit tests in this PR.

@aljoscha
Copy link
Copy Markdown
Contributor Author

You mean to also exclude ad62b3c? I realized that this is a fix for a reduction in nemesis coverage that #9659 introduced. It's the problem I mentioned above, that always passing in a [0] frontier effectively disables testing for cases where compaction is beyond 0.

But I agree, I should remove the other nemesis changes. (except maybe the one I mentioned right now)

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Jan 12, 2022

Yes, exclude that commit here, it should def be its own PR

@aljoscha aljoscha force-pushed the persist-relax-allow-compaction branch from c72e2c5 to 8688fff Compare January 13, 2022 09:53
@aljoscha aljoscha requested a review from danhhz January 13, 2022 09:53
@aljoscha
Copy link
Copy Markdown
Contributor Author

Updated, removed superfluous changes and added rustdoc, PTAL. 🖐️

Before, we would not allow allowing compaction beyond the seal frontier
(aka upper). Differential dataflow allows this, and it will make
enabling compaction for persistent sources easier if we do as well.

This requires changing the nemesis tests, because requests that would
previously fail now pass.

This also requires updating the golden test. The reason being that the
(deterministic) generator that the golden test uses generates some
allow_compaction requests that fail and some that pass, and the code
that applies those ignores requests that fail. After this change, those
requests all succeed, meaning the state is different.
@aljoscha aljoscha force-pushed the persist-relax-allow-compaction branch from 8688fff to 828d692 Compare January 13, 2022 09:56
@ruchirK
Copy link
Copy Markdown
Contributor

ruchirK commented Jan 13, 2022

I'm taking myself off of this review because I probably won't have time to get to this today and it looks like you and Dan have already covered things to an almost final conclusion anyway

@ruchirK ruchirK removed their request for review January 13, 2022 14:06
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Please cargo stress nemesis's direct_mem for a few minutes again before you merge just to make sure (unless you've already done that)

@aljoscha aljoscha merged commit 062fa9f into MaterializeInc:main Jan 13, 2022
@aljoscha aljoscha deleted the persist-relax-allow-compaction branch January 13, 2022 16:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants