persist: periodically yield in Replay operator#9615
Conversation
aacda5b to
c94e2eb
Compare
aljoscha
left a comment
There was a problem hiding this comment.
The changes look good to me! Do we have any idea if there's a negative (or positive) impact from this? Any benchmarks where this change moves anything?
| debug_assert!(snapshot_since.less_equal(ts)); | ||
| } | ||
| session.give(x); | ||
| if idx + 1 >= outputs_per_yield { |
There was a problem hiding this comment.
it was not obvious to me that enumerate will restart counting from zero every time we re-enter the operator - maybe worth a comment?
|
Good call on the benchmarks. I'll wait until we have the dec milestone benchmarks and use those (along with our microbenchmarks and the WIP "feature benchmarks" one that I think Philip has) to eval the perf diff. Will add ruchir's requested comment and then mark as draft in the meantime |
|
I'm going to test this out locally actually. I'm seeing my pr with compaction enabled for kafka upsert sources be really fast on initial load (comparable to the non-persisted case, query times and mem usage are all as expected), but on restart memory usage spikes up a lot, roughly double the expected (where expected already factors in the doubling due to BlobCache), before settling down to something closer to the steady state. |
|
The benchmark I'm currently working on no longer restarts Materialize so it won't be relevant in testing this. Have you tried testing with the |
No, but I don't expect there to be much difference. KafkaRecovery is over a pretty small amount of data and this has exactly the same behavior as before up to 1m records. One of the things I said in my big comment on your PR is that we need something similar to KafkaRecovery but with a much larger amount of data. |
|
|
@philip-stoev The context here is that one of our (now overdue) dec goals was a repeatable benchmark for this. As I was mentioning in the other comment, KafkaRecovery is basically perfect, but we need it to be quite a big bigger for this goal. Editing scenarios.py works as a workaround, but the milestone goal isn't so much about having a workaround, but having a declared thing we can easily run. One idea is to fork KafkaRecovery into a new KafkaBigRecovery with more records (1B?) and different timeouts. (Ideally it would also be something more like a chbench schema + key distribution, but maybe we can skip that in the short term.) I worry though about how much different this KafkaBigRecovery would be than the other feature benchmarks, would it be abusing the system? |
|
@danhhz understood, let me poke a bit and I will get back to you. I think doing 1B within the existing test infrastructure with a separately-named |
|
To be clear, this is a goal for the persist team, so don't feel like you need to be randomized by it. We're also happy to do the work with your design direction. I'm not familiar with kgen yet, so I'll have to get back to you on that. 1B was just a number I pulled out of the air becuase it's 100x the current test size and 100x the current running time of the test was around the size I'm looking for. As for the memory usage, I think instead of a materialized source, we'd want a non-materialized source with some very simple derived view that reduces down the data a bunch. |
|
@danhhz understood. We had talked yesterday about how this PR was blocked on the missing large benchmark so I was wondering if there were ways to unblock you quickly more than anything else. I'll push out a more self contained restarts benchmark in a separate pr ASAP and we can go from there. |
|
This PR isn't being blocked isn't an issue, there's no immediate pressure to get it merged (beyond that it should be merged by end of Jan). I'm intentionally holding it in reserve as a test case for our new benchmark infra. |
c400cb3 to
71b9b40
Compare
danhhz
left a comment
There was a problem hiding this comment.
we have KafkaRecoveryBig now, so rebased this and will benchmark it using that. most of the conflicts were straightforward, with the exception of the as_of error path's early return not playing well with the done bool. I used an inline closure, but I'm not entirely happy with that. will continue to think on this while the benchmark runs
| debug_assert!(snapshot_since.less_equal(ts)); | ||
| } | ||
| session.give(x); | ||
| if idx + 1 >= outputs_per_yield { |
71b9b40 to
327024d
Compare
|
Hmm didn't see any change, which is a little surprising. At the very least, I should be able to see memory differences, I'll mull over how to verify that. Raw data |
|
I'm thinking we maybe shouldn't expect differences? With |
|
you can use so hypothesis: we don't see a mem difference because data doesn't make it into the holding pen for reduce because it gets filtered out by this extremely restrictive filter? I haven't actually verified but imo you could create a materialized view that tracks e.g. the max mz_offset and asserts that the sum of the max mz_offset by partition = number of records ingested? that view will use all the data and is maybe more likely to see a memory reduction? |
Instead of emitting it all the first time the operator is scheduled. This allows downstream operators a chance to process the data it has emitted, possibly reducing it down and evening out memory usage. Until the persist Replay operator has finished, it can't downgrade its output capability. However, Frank has said "I would say that operators are mostly either 1. stateless and benefit from streaming through, or 2. arrange." Even arrange has some opportunity for data reduction. An arrange downstream of replay would just be buffering the replayed data until the frontier advances, but it does consolidate within this buffer. Persist keeps things mostly consolidated, but unsealed data is unconsolidated and even trace data is not across batches (which could be relevant depending on the as_of).
327024d to
eb4f8a6
Compare
|
Ran KafkaRecoveryBig hooked up to prometheus to try to investigate and uhhhh.... I only ran it the once and the first results looked pretty noisy before so who knows what happened. I'll run it again a few times. Meanwhile, I couldn't particularly see any differences in the memory usage, but the metrics make it pretty obvious that compaction is happening while we're replaying, so that will certainly contribute to the noise. I think the real answer here is running the KafkaRecoveryBig restarted mz with compaction disabled, but in the short term, maybe it will even out after a few runs? |
|
Okay, running KafkaRecoveryBig with --max-runs 5 resulted in something that looks similar to my previous results (modulo both sides have since been sped up). Memory usage still seems too noisy to draw any strong conclusions, but we can definitely see the effect when looking at timely thread cpu. @ruchirK This seems like it might be a small throughput improvement (1%) and it seems to be doing what it's supposed to. I'm inclined to merge it at this point, but curious what your thoughts are. We could also take a swing at a cli flag to disable unsealed_drain and trace_compaction to clean up the benchmark results. |
|
That all seems fine and I agree its fine to merge re: cli flags to disable compaction/drain unsealed -- materialize already has a builtin flag for compaction |
|
Oh that's clever. I'm not sure whether it's too "action at a distance" to check it in to KafkaRecoveryBig like that, but it certainly should be good enough to run a quick experiment on this branch |
|
Unsealed is less of an issue because in practice it seem to only poison the first run of each side |
|
Huh, something isn't working with |
Instead of emitting it all the first time the operator is scheduled.
This allows downstream operators a chance to process the data it has
emitted, possibly reducing it down and evening out memory usage.
Until the persist Replay operator has finished, it can't downgrade its
output capability. However, Frank has said "I would say that operators
are mostly either 1. stateless and benefit from streaming through, or 2.
arrange."
Even arrange has some opportunity for data reduction. An arrange
downstream of replay would just be buffering the replayed data until the
frontier advances, but it does consolidate within this buffer. Persist
keeps things mostly consolidated, but unsealed data is unconsolidated
and even trace data is not across batches (which could be relevant
depending on the as_of).
Checklist
This change is