Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2052,6 +2052,77 @@ def benchmark(self) -> MeasurementSource:
""")


class MySqlInitialLoadMultiWorker(MySqlCdc):
"""Measure the time it takes to read 1M existing records from MySQL
when creating a materialized source with 8 workers.

With multiple workers and PK-range partitioning, each worker reads
a disjoint range of the primary key in parallel. This benchmark
measures the effectiveness of that parallelism."""

# 8 workers naturally use more memory than 1 (more concurrent data in-flight)
RELATIVE_THRESHOLD: dict[MeasurementType, float] = {
MeasurementType.WALLCLOCK: 0.10,
MeasurementType.MEMORY_MZ: 0.60,
MeasurementType.MEMORY_CLUSTERD: 0.60,
}

def shared(self) -> Action:
# Use batch inserts to support scales beyond what a single
# cross-join can produce (mysql.time_zone^2 ≈ 3M rows max).
batch = 1_000_000
inserts = []
remaining = self.n()
while remaining > 0:
chunk = min(remaining, batch)
inserts.append(
f"SET @i:=COALESCE((SELECT MAX(pk) FROM pk_table), 0);\n"
f"INSERT INTO pk_table SELECT @i:=@i+1, @i*@i "
f"FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {chunk};"
)
remaining -= chunk
insert_stmts = "\n".join(inserts)
return TdAction(f"""
$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;

CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
{insert_stmts}
""")

def before(self) -> Action:
return TdAction("""
> DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
""")

def benchmark(self) -> MeasurementSource:
return Td(f"""
> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET mysqlpass
)

> CREATE CLUSTER source_cluster SIZE 'scale=1,workers=8', REPLICATION FACTOR 1;

> CREATE SOURCE mz_source_mysqlcdc
IN CLUSTER source_cluster
FROM MYSQL CONNECTION mysql_conn;
> CREATE TABLE pk_table FROM SOURCE mz_source_mysqlcdc (REFERENCE public.pk_table);
/* A */

> SELECT count(*) FROM pk_table
/* B */
{self.n()}
""")


class MySqlStreaming(MySqlCdc):
"""Measure the time it takes to ingest records from MySQL post-snapshot"""

Expand Down
7 changes: 5 additions & 2 deletions misc/python/materialize/mzcompose/services/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@


def create_mysql_server_args(
server_id: str, is_master: bool, binlog_row_metadata: str = "full"
server_id: str,
is_master: bool,
binlog_row_metadata: str = "full",
max_connections: int = 1000,
) -> list[str]:
args = [
"--log-bin=mysql-bin",
Expand All @@ -25,7 +28,7 @@ def create_mysql_server_args(
"--binlog-row-image=full",
f"--binlog-row-metadata={binlog_row_metadata}",
f"--server-id={server_id}",
"--max-connections=500",
f"--max-connections={max_connections}",
]

if not is_master:
Expand Down
Loading
Loading