persist: mzworkflows based load test#9812
Conversation
|
@danhhz the second commit automates the restart testing workflow - seed materialize with a view that counts the number of rows in an upsert source, repeatedly shut down mz, place more data in the topic, and restart that I've been doing manually / we want for the december milestone. One nice thing here is that its trivial to convert this from records with almost no duplicate keys (sampling uniformly over all u64s) to one where compaction helps. Still a fairly rough cut - just wanted to give you a heads up. |
a303f9e to
4ca7b72
Compare
10d77ed to
3168a59
Compare
|
This is RFAL! @benesch / @philip-stoev this is what I had in mind for a kafka upsert benchmark to replace perf-upsert / avro_upsert / avro_ingest (I know this doesn't support avro yet but it could), and to use as a long-running load test. My biggest open question here is "should this be done with the feature-benchmark framework instead" - I'm not currently sure. My read currently from skimming is no, but I don't have a more substantive way to back that up just yet. I'll spend the rest of today getting more familiar with the feature-benchmark framework to get a better answer to this. |
3168a59 to
d56fceb
Compare
|
|
||
| def workflow_kafka_upsert_benchmark(w: Workflow, args: List[str]): | ||
| parser = WorkflowArgumentParser(w) | ||
| parser.add_argument("--num-steps", type=int, default=10) |
There was a problem hiding this comment.
With those defaults, the benchmark is measuring the time the SELECT took to run, as the data is fully ingested by the time the statement executes. So the numbers are into the low milliseconds range:
C> waiting for materialize to handle 'SELECT * FROM num_records_ingested'C> query result: 200000 after 0.004557367414236069
You may wish to consider much higher defaults, ones that will push the results far beyond the 1-second granularity of the measurements.
There was a problem hiding this comment.
changed the approach to be totally different here, but I agree with that analysis. had picked the defaults to be low to not surprise first time users - not a very good reason.
| query = "SELECT * FROM num_records_ingested" | ||
| ui.progress(f"waiting for materialize to handle {query!r}", "C") | ||
| error = None | ||
| start_time = time.monotonic() |
There was a problem hiding this comment.
Since the topic and the source are being reused across runs, Mz can start ingesting stuff while the kgen is still in progress. Obtaining start_time here will fail to take this part of the effort into account.
There was a problem hiding this comment.
removed the restarts bit.
| # made definite, and try do so with as little overhead as possible. | ||
| > CREATE MATERIALIZED VIEW max_offsets AS SELECT | ||
| kafka_partition, | ||
| MAX(mz_offset) as max_offset FROM load_test |
There was a problem hiding this comment.
I think MAX should be considered as a "big overhead" operation, given that it will keep a list of all the mz_offsets ever seen in memory.
It may be best to simply use COUNT(*) instead.
There was a problem hiding this comment.
Right. I considered this in depth because count(*) is more memory efficient but doesn't offer a good way to track when we've caught up when there are duplicates (because a duplicate key won't show up in the counts twice)
There was a problem hiding this comment.
I came up with a solution that I like (basically, we track an approximate max with mz_offset / 1000 + 1 aka the mz_offset to the next thousandth offset, which lets us use a max with 0.1% of the memory overhead. The math around tracking "have we caught up" gets more complicated so I'm going to do that in a followup
There was a problem hiding this comment.
Right. I considered this in depth because count(*) is more memory efficient but doesn't offer a good way to track when we've caught up when there are duplicates (because a duplicate key won't show up in the counts twice)
The trick I used with the feature benchmark framework is to emit a unique marker record that has not been generated before. Once I see this record in the output, I know that we have caught up to that point, regardless of any duplicates and such that may have been present in the stream prior to that.
There was a problem hiding this comment.
sorry could you say a bit more about what the feature benchmarks are computing? (do they just CREATE MATERIALIZED SOURCE ... or do they do something else?)
There was a problem hiding this comment.
I was talking about this scenario:
https://github.com/MaterializeInc/materialize/blob/main/test/feature-benchmark/scenarios.py#L716
Basically we ingest a bunch of key=1's followed by one key=2 . Then we measure the time it takes from CREATE MATERIALIZED SOURCE to the time it takes for the 2 to show up in the output of the SELECT.
There was a problem hiding this comment.
I actually came up with another approach that I think works even better.
Basically, you can use mz_source_info but keep a wholly uncompacted version around, and then you just need to:
- look up the current time the dataflow you are testing is materialized up to
- look up which how much data was recorded in mz_source_info AS OF that time,
this cuts the memory requirement down by a lot because now the dataflow for the source can be a simple count(*) (according to docker stats on my laptop a test sending 10k records per second for ~120 seconds goes from ~850 MB to ~350 MB)
| ), | ||
| ) | ||
|
|
||
| if restart_mz: |
There was a problem hiding this comment.
The first measurement reported is always 1s , which I believe is due to the way events are ordered:
- kgen
- start Mz
- do the SELECT
By the time step 3 is performed, Mz has been up and running and ingesting, which causes that measurement to be incorrect. Here is the log:
> CREATE MATERIALIZED VIEW num_records_ingested AS SELECT SUM(max_offset) FROM max_offsets;
rows match; continuing at ts 1641369900.6116853
testdrive completed successfully.
Killing kafka-upsert-benchmark_materialized_1 ... done
Creating kafka-upsert-benchmark_kgen_run ... done
Using 8 threads...
Starting kafka-upsert-benchmark_materialized_1 ... done
C> waiting for dbname=materialize host=localhost port=56467 user=materialize password=postgres to handle 'SELECT 1'success!
C> waiting for materialize to handle 'SELECT * FROM num_records_ingested'C> materialize has not ingested 200000 records, got: 149700
C> query result: 200000 after 1.0029516108334064
success!
So Mz has ingested 149700 records already by the time the SELECT is issued.
|
With respect to this vs. the feature benchmark, the feature benchmark currently does not integrate kgen and the attendant flexibility in generating data -- That said, the feature benchmark has benefited from a period of intense traumatic head-banging on my part that has eliminated some sources of incorrect measurements:
|
I'm not entirely sure that I understand whether you're advocating for using the feature benchmark to measure this or not. :D On a quick glance it seems like we're only using the most basic of kgen features here, so probably could use kafka-ingest instead without too much trouble? |
|
I think there's been some confusion around the context of this PR. Part of persist's dec milestone goals is the following two measurements:
I think one takeaway here is that, while at some point the interim, temporary solutions we've discussed look very similar, the ideal end goals are actually different enough that maybe we don't want to make one piece of code do both. Perhaps the first benchmark should be something that's in spirit closer to demo/chbench (though I think it's fine to have the load generator talk directly to kafka instead of mysql). (I haven't looked closely at this PR yet, but it's possible that if we pull out some of the complexity, it's basically this?) Then the second (maybe?) works well as a big big feature benchmark. |
Yes true that, if you would like to measure CPU utilization and memory and such, have Graphana, etc. the feature benchmark would not be the right vehicle for the time being.
Yes, this will be happening shortly. The ducks are all aligning to make it possible
I will work on getting the schema and the key distribution more realistic. |
I think I can have kgen do the data generation within the context of the feature benchmark. |
Apologies for any duplication of effort, I should have kept myself more informed about the developments within the persistence team. |
d56fceb to
2d31ee8
Compare
|
I've reworked this benchmark to be more in line with the "we want to observe the behavior of ingesting stuff over time" and be a lot simpler PTAL! It's basically a full rewrite of the non-boilerplate parts of mzcompose.py which is why I squashed the changes. Now, it takes in a |
2d1b369 to
107f95b
Compare
|
I ran this ~30 times just now using configurations like before learning that all of my results are probably invalid because docker for mac isn't the best. I'll redo everything on a linux box but clear learnings here:
|
|
Ok. Running on a running: (100k QPS for 100 seconds) on a single worker MZ instance produces: I'm eliding the other output to make this more clear. In contrast, running with single worker + During this run MZ took at peak In contrast, single worker Materialize without persistence can handle 200kqps just fine: Running: gives us: (note that this lag corresponds to being ~2.25 seconds behind), using |
|
Other notes:
|
|
This is really cool! I like how it turned out. I haven't looked at the code yet, but I'll do that today You've mentioned docker stats in a couple places in slack, IMO let's hold off on doing anything with that for V1 of this. I'd much rather just hook it up to prometheus+grafana like the rest of the release benchmarks to start. We can always get that in async (and I'll want grafana even if we have the docker stats stuff). |
|
As for how we tune this in the release benchmark, there's sort of 2 major things we could be investigating wrt the diff between persist on and off. One is a workload they both can handle relatively easily and looking at the diff in cpu and memory usage. The other is the max sustained rate they each can handle on the same hardware. We should discuss in the sync which we want, but my inclination is to start with the former. In my experience, behavior under near failure conditions is its own set of issues and the steady state differences tends to be an easier place to start. |
|
Ack on Re: what to run in release tests -- yeah that makes sense to me. I was thinking something like 10k qps running over the order of hours? I think we should test at max capacity manually maybe once a week or something like that and track results in a spreadsheet |
|
I very much like the docker stats idea in that it's something we can stick in the benchmark output and copy paste for easy comparison. I just wanted to be clear that even if we have docker stats, I'll still want grafana and that if we have grafana, we don't need docker stats to start :)
These sound good to me modulo bikeshedding the cadence of the latter. My gut says weekly will be too frequently for something manual. Let's punt on that bikeshed for now though |
danhhz
left a comment
There was a problem hiding this comment.
LGTM! you might want someone familiar with python to stamp this too. i skimmed over the pg8000 usage, for example
| <null> | ||
|
|
||
| > CREATE SOURCE load_test | ||
| FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-load-test-${testdrive.seed}' |
There was a problem hiding this comment.
should we hardcode -1 here to match the hardcoded value in send_records?
There was a problem hiding this comment.
No - i think its better to keep the knowledge that the seed is hardcoded to 1 contained within mzcompose.py so that the testdrive file doesn't have to change if its user changes their behavior
There was a problem hiding this comment.
sorry, I don't follow. isn't this the only user of this script?
There was a problem hiding this comment.
yes thats true but sometimes I'll use the testdrive file within a workflow to manually do what the workflow is doing. I've definitely done that in the past with the exactly-once-sinks test, and I could see doing something like this in order to have measure mz with instruments on my laptop. When running in that kind of environment its nice to be able to change the seed so you don't have to tear down the kafka topic as you iterate on stuff
There was a problem hiding this comment.
definitely a niche usecase. My second thought here is that using the seed is idiomatic testdrive usage, and its actually a hack that we hardcode the seed in the mzcompose.py, and perhaps instead that code should invoke testdrive with consistent_seed=True and have a method that lets someone get the seed. I think the second thought is more convincing
There was a problem hiding this comment.
Tangentially related: I'd like to very soon move this sort of stuff out of testdrive entirely. It should be possible to write this all in the Python workflow file. E.g.:
kafka = c.kafka_client("kafka")
kafka.create_topic(topic="foo", partitions=4)
c.sql("CREATE SOURCE...")
c.sleep(5)
c.sql("...")Then you wouldn't need to worry about the seed at all.
| if row is None or len(row) != 1 or row[0] is None: | ||
| return 0 | ||
| return int(row[0]) | ||
| except Exception as e: |
There was a problem hiding this comment.
except Exception is bad practice because it'll catch all sorts of things besides database errors. Can you figure out what more specific exception pg8000 is raising and catch that instead?
There was a problem hiding this comment.
done. it was an InterfaceError.
I had to import it like this:
from pg8000 import InterfaceError # type: ignore
to make pycheck happy.
| conn.autocommit = True | ||
|
|
||
| cursor = conn.cursor() | ||
| cursor.execute("SELECT * FROM load_test_materialization_frontier") |
There was a problem hiding this comment.
I think you should be able to replace most of this with Composition.sql if you change that method to return the cursor. (Or add a new method like sql_cursor that returns a ready-to-use cursor, and use it in Composition.sql and here.)
There was a problem hiding this comment.
done. added sql_cursor() in a separate commit.
f2c648d to
1367da5
Compare
1367da5 to
0396836
Compare
This commit adds a dedicated kafka upsert benchmark using the new mzworkflows framework but not within the feature-benchmarks framework. This benchmark is an open loop benchmark that takes as its main arguments - a desired QPS rate for messages to send to Kafka - the number of seconds to write for. It's open loop in the sense that the rate of message generation / insertion to Kafka is decoupled as much as possible from the rate at which messages are ingested into Materialize. We intentionally don't wait for Materialize to catch up to the last set of messages before sending the next set of messages. The benchmark allows for flexibility in the following other dimensions: - key cardinality - value size - whether or not persistence is enabled Given the following inputs the benchmark sends approximately `records_per_second` messages to Kafka per second and corrects for delays caused by slow input generation or slow querying. It also queries Materialize and tracks how many records Materialize has ingested, and reports the lag relative to the number of messages already inserted. The benchmark currently doesn't support: - data formats other than bytes - specifying the number of partitions (currently hardcoded to 4) - specifying the number of materialize workers
0396836 to
9a0fbd5
Compare
|
TFTR! merging on green |
Motivation
test: add dedicated kafka upsert benchmark
This commit adds a dedicated kafka upsert benchmark using the new mzworkflows
framework but not within the feature-benchmarks framework.
This benchmark allows users to set up a kafka topic and a corresponding upsert
source with byte keys and values and send records to it and observe the time
it takes to ingest the specified number of records.
The benchmark allows for flexibility in the following dimensions:
send many large batches, or send one initial large batch and then many small
batches of records)
The benchmark currently doesn't support:
Tips for reviewer
Checklist
This change is