diff --git a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py index 84b70745e0dc2..ff7aedf7a817b 100644 --- a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py +++ b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py @@ -2052,6 +2052,77 @@ def benchmark(self) -> MeasurementSource: """) +class MySqlInitialLoadMultiWorker(MySqlCdc): + """Measure the time it takes to read 1M existing records from MySQL + when creating a materialized source with 8 workers. + + With multiple workers and PK-range partitioning, each worker reads + a disjoint range of the primary key in parallel. This benchmark + measures the effectiveness of that parallelism.""" + + # 8 workers naturally use more memory than 1 (more concurrent data in-flight) + RELATIVE_THRESHOLD: dict[MeasurementType, float] = { + MeasurementType.WALLCLOCK: 0.10, + MeasurementType.MEMORY_MZ: 0.60, + MeasurementType.MEMORY_CLUSTERD: 0.60, + } + + def shared(self) -> Action: + # Use batch inserts to support scales beyond what a single + # cross-join can produce (mysql.time_zone^2 ≈ 3M rows max). + batch = 1_000_000 + inserts = [] + remaining = self.n() + while remaining > 0: + chunk = min(remaining, batch) + inserts.append( + f"SET @i:=COALESCE((SELECT MAX(pk) FROM pk_table), 0);\n" + f"INSERT INTO pk_table SELECT @i:=@i+1, @i*@i " + f"FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {chunk};" + ) + remaining -= chunk + insert_stmts = "\n".join(inserts) + return TdAction(f""" +$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}} + +$ mysql-execute name=mysql +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; + +CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT); +{insert_stmts} +""") + + def before(self) -> Action: + return TdAction(""" +> DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE; +> DROP CLUSTER IF EXISTS source_cluster CASCADE + """) + + def benchmark(self) -> MeasurementSource: + return Td(f""" +> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}' +> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +> CREATE CLUSTER source_cluster SIZE 'scale=1,workers=8', REPLICATION FACTOR 1; + +> CREATE SOURCE mz_source_mysqlcdc + IN CLUSTER source_cluster + FROM MYSQL CONNECTION mysql_conn; +> CREATE TABLE pk_table FROM SOURCE mz_source_mysqlcdc (REFERENCE public.pk_table); + /* A */ + +> SELECT count(*) FROM pk_table + /* B */ +{self.n()} + """) + + class MySqlStreaming(MySqlCdc): """Measure the time it takes to ingest records from MySQL post-snapshot""" diff --git a/misc/python/materialize/mzcompose/services/mysql.py b/misc/python/materialize/mzcompose/services/mysql.py index f0536a759bc84..66d904c2bc0f6 100644 --- a/misc/python/materialize/mzcompose/services/mysql.py +++ b/misc/python/materialize/mzcompose/services/mysql.py @@ -15,7 +15,10 @@ def create_mysql_server_args( - server_id: str, is_master: bool, binlog_row_metadata: str = "full" + server_id: str, + is_master: bool, + binlog_row_metadata: str = "full", + max_connections: int = 1000, ) -> list[str]: args = [ "--log-bin=mysql-bin", @@ -25,7 +28,7 @@ def create_mysql_server_args( "--binlog-row-image=full", f"--binlog-row-metadata={binlog_row_metadata}", f"--server-id={server_id}", - "--max-connections=500", + f"--max-connections={max_connections}", ] if not is_master: diff --git a/src/storage/src/source/mysql/snapshot.rs b/src/storage/src/source/mysql/snapshot.rs index 35ab4ac071c92..4c8de21b42e78 100644 --- a/src/storage/src/source/mysql/snapshot.rs +++ b/src/storage/src/source/mysql/snapshot.rs @@ -29,34 +29,34 @@ //! //! ## Producing a snapshot at a known point in time. //! -//! Ideally we would like to start a transaction and ask MySQL to tell us the point in time this -//! transaction is running at. As far as we know there isn't such API so we achieve this using -//! table locks instead. +//! A designated leader worker acquires table locks on ALL tables that need snapshotting, reads the +//! current GTID frontier, discovers PK bounds for parallel range splitting, and broadcasts a +//! `SnapshotInfo` to all workers via a timely feedback loop. All workers then start CONSISTENT +//! SNAPSHOT transactions while the leader holds the locks. Workers signal completion by dropping +//! their snapshot capability. The leader drains the feedback input to confirm all workers have +//! started, then unlocks the tables. //! -//! The full set of tables that are meant to be snapshotted are partitioned among the workers. Each -//! worker initiates a connection to the server and acquires a table lock on all the tables that -//! have been assigned to it. By doing so we establish a moment in time where we know no writes are -//! happening to the tables we are interested in. After the locks are taken each worker reads the -//! current upper frontier (`snapshot_upper`) using the `@@gtid_executed` system variable. This -//! frontier establishes an upper bound on any possible write to the tables of interest until the -//! lock is released. +//! ## Parallel PK-range snapshots //! -//! Each worker now starts a transaction via a new connection with 'REPEATABLE READ' and -//! 'CONSISTENT SNAPSHOT' semantics. Due to linearizability we know that this transaction's view of -//! the database must some time `t_snapshot` such that `snapshot_upper <= t_snapshot`. We don't -//! actually know the exact value of `t_snapshot` and it might be strictly greater than -//! `snapshot_upper`. However, because this transaction will only be used to read the locked tables -//! and we know that `snapshot_upper` is an upper bound on all the writes that have happened to -//! them we can safely pretend that the transaction's `t_snapshot` is *equal* to `snapshot_upper`. -//! We have therefore succeeded in starting a transaction at a known point in time! +//! For tables with a single-column integer primary key, the leader queries `MIN(pk)` and `MAX(pk)` +//! and broadcasts the bounds. Each worker computes its assigned PK range and reads only that +//! portion of the table. Tables without a suitable PK fall back to single-worker-per-table mode. //! -//! At this point it is safe for each worker to unlock the tables, since the transaction has -//! established a point in time, and close the initial connection. Each worker can then read the -//! snapshot of the tables it is responsible for and publish it downstream. +//! ## Resource considerations //! -//! TODO: Other software products hold the table lock for the duration of the snapshot, and some do -//! not. We should figure out why and if we need to hold the lock longer. This may be because of a -//! difference in how REPEATABLE READ works in some MySQL-compatible systems (e.g. Aurora MySQL). +//! Parallel range reads open one MySQL connection per worker that has a range to read, plus the +//! leader's lock connection, so a single source can hold up to `worker_count + 1` concurrent +//! upstream connections while snapshotting. On clusters with many workers, or when many sources +//! snapshot concurrently, this can approach the server's `max_connections` limit (which defaults +//! to 151 on stock MySQL, far below typical PostgreSQL defaults). A failure to connect surfaces as +//! a transient error that restarts the snapshot, so connection pressure causes retries rather than +//! incorrect data. Workers with no range to read skip connecting to avoid amplifying this. +//! +//! The leader holds the global `LOCK TABLES ... READ` only until every worker has *started* its +//! `CONSISTENT SNAPSHOT` transaction (not until reads finish), since each transaction pins its read +//! view at start. The lock window therefore scales with the time for the slowest worker to connect +//! and begin its transaction, during which upstream writes to the locked tables are blocked +//! (bounded by `snapshot_lock_wait_timeout`). //! //! ## Rewinding the snapshot to a specific point in time. //! @@ -89,7 +89,7 @@ use std::rc::Rc; use std::sync::Arc; use differential_dataflow::AsCollection; -use futures::TryStreamExt; +use futures::{StreamExt as _, TryStreamExt}; use itertools::Itertools; use mysql_async::prelude::Queryable; use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts}; @@ -100,19 +100,23 @@ use mz_ore::cast::CastFrom; use mz_ore::future::InTask; use mz_ore::iter::IteratorExt; use mz_ore::metrics::MetricsFutureExt; -use mz_repr::{Diff, Row}; +use mz_repr::{Diff, Row, SqlScalarType}; use mz_storage_types::errors::DataflowError; use mz_storage_types::sources::MySqlSourceConnection; use mz_storage_types::sources::mysql::{GtidPartition, gtid_set_frontier}; use mz_timely_util::antichain::AntichainExt; -use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton}; +use mz_timely_util::builder_async::{ + Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, +}; use mz_timely_util::containers::stack::FueledBuilder; use timely::container::CapacityContainerBuilder; +use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::core::Map; -use timely::dataflow::operators::{CapabilitySet, Concat}; +use timely::dataflow::operators::vec::Broadcast; +use timely::dataflow::operators::{CapabilitySet, Concat, ConnectLoop, Feedback}; use timely::dataflow::{Scope, StreamVec}; use timely::progress::Timestamp; -use tracing::{error, trace}; +use tracing::trace; use crate::metrics::source::mysql::MySqlSnapshotMetrics; use crate::source::RawSourceCreationConfig; @@ -125,6 +129,114 @@ use super::{ TransientError, return_definite_error, validate_mysql_repl_settings, }; +/// If `desc` has a single-column integer PK, return the column name. +fn integer_pk_col(desc: &mz_mysql_util::MySqlTableDesc) -> Option<&str> { + let pk = desc.keys.iter().find(|k| k.is_primary)?; + if pk.columns.len() != 1 { + return None; + } + let pk_col_name = &pk.columns[0]; + let col = desc.columns.iter().find(|c| &c.name == pk_col_name)?; + let scalar_type = &col.column_type.as_ref()?.scalar_type; + match scalar_type { + SqlScalarType::Int16 + | SqlScalarType::Int32 + | SqlScalarType::Int64 + | SqlScalarType::UInt16 + | SqlScalarType::UInt32 => Some(pk_col_name.as_str()), + // Exclude UInt64 since MIN/MAX are queried as i64 + _ => None, + } +} + +/// PK bounds for a table, discovered by the leader. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct PkBounds { + pk_col: String, + min_val: i64, + max_val: i64, +} + +/// Snapshot info broadcast from leader to all workers. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct SnapshotInfo { + gtid_set: String, + /// PK bounds per table. None = no integer PK, use single-worker fallback. + pk_bounds: BTreeMap>, +} + +/// A worker's assigned PK range for a table. +struct PkRange { + pk_col: String, + lower: i64, + upper: Option, // None = open-ended (last worker) +} + +/// Compute this worker's PK range. Returns None if this worker has no work. +/// +/// Uses `i128` intermediate arithmetic to avoid overflow when the PK range +/// spans a large portion of the `i64` domain (e.g. `min_val = 0, max_val = 2^62`). +fn worker_pk_range(bounds: &PkBounds, worker_id: usize, worker_count: usize) -> Option { + // Use i128 throughout to avoid overflow on large Int64 PK ranges. + let min = i128::from(bounds.min_val); + let max = i128::from(bounds.max_val); + let range_size = max - min + 1; + if range_size <= 0 { + return None; + } + let effective = std::cmp::min(i128::cast_from(worker_count), range_size); + if i128::cast_from(worker_id) >= effective { + return None; + } + let wid = i128::cast_from(worker_id); + let start_128 = min + wid * range_size / effective; + let start = i64::try_from(start_128).expect("PK range start fits in i64"); + let is_last = wid == effective - 1; + if is_last { + Some(PkRange { + pk_col: bounds.pk_col.clone(), + lower: start, + upper: None, + }) + } else { + let end_128 = min + (wid + 1) * range_size / effective; + let end = i64::try_from(end_128).expect("PK range end fits in i64"); + Some(PkRange { + pk_col: bounds.pk_col.clone(), + lower: start, + upper: Some(end), + }) + } +} + +/// Error from the snapshot leader's lock-acquisition / GTID-reading phase. +/// Allows the leader to broadcast a failure sentinel before returning, so +/// that non-leader workers do not deadlock waiting for `SnapshotInfo`. +enum LeaderError { + /// A definite error (e.g. table dropped) — emitted for every output. + Definite(DefiniteError), + /// A transient error — causes the source to restart. + Transient(TransientError), +} + +impl From for LeaderError { + fn from(e: mysql_async::Error) -> Self { + LeaderError::Transient(e.into()) + } +} + +impl From for LeaderError { + fn from(e: MySqlError) -> Self { + LeaderError::Transient(e.into()) + } +} + +impl From for LeaderError { + fn from(e: anyhow::Error) -> Self { + LeaderError::Transient(e.into()) + } +} + /// Renders the snapshot dataflow. See the module documentation for more information. pub(crate) fn render<'scope>( scope: Scope<'scope, GtidPartition>, @@ -141,19 +253,32 @@ pub(crate) fn render<'scope>( let mut builder = AsyncOperatorBuilder::new(format!("MySqlSnapshotReader({})", config.id), scope.clone()); + let (feedback_handle, feedback_data) = scope.feedback(Default::default()); + let (raw_handle, raw_data) = builder.new_output::>(); let (rewinds_handle, rewinds) = builder.new_output::>>(); // Captures DefiniteErrors that affect the entire source, including all outputs let (definite_error_handle, definite_errors) = builder.new_output::>>(); + let (snapshot_handle, snapshot) = builder.new_output::>>(); + + // This operator needs to broadcast data to itself in order to synchronize the transaction + // snapshot. However, none of the feedback capabilities result in output messages and for the + // feedback edge specifically having a default connection would result in a loop. + let mut snapshot_input = builder.new_disconnected_input(feedback_data, Pipeline); + + // The snapshot info must be sent to all workers, so we broadcast the feedback connection + snapshot.broadcast().connect_loop(feedback_handle); + + let is_snapshot_leader = config.responsible_for("mysql_snapshot_leader"); // A global view of all outputs that will be snapshot by all workers. let mut all_outputs = vec![]; - // A map containing only the table infos that this worker should snapshot. - let mut reader_snapshot_table_info = BTreeMap::new(); - // Maps MySQL table name to export `SourceStatistics`. Same info exists in reader_snapshot_table_info, - // but this avoids having to iterate + map each time the statistics are needed. - let mut export_statistics = BTreeMap::new(); + // ALL workers get ALL tables that need snapshotting (for parallel PK-range reads). + let mut tables_to_snapshot: BTreeMap> = BTreeMap::new(); + // Maps MySQL table name to export `SourceStatistics`. + // Every worker keeps handles so `snapshot_records_staged` can sum local contributions. + let mut export_statistics: BTreeMap> = BTreeMap::new(); for output in source_outputs.into_iter() { // Determine which outputs need to be snapshot and which already have been. if *output.resume_upper != [GtidPartition::minimum()] { @@ -161,55 +286,52 @@ pub(crate) fn render<'scope>( continue; } all_outputs.push(output.output_index); - if config.responsible_for(&output.table_name) { - let export_stats = config - .statistics - .get(&output.export_id) - .expect("statistics have been intialized") - .clone(); - export_statistics - .entry(output.table_name.clone()) - .or_insert_with(Vec::new) - .push(export_stats); - - reader_snapshot_table_info - .entry(output.table_name.clone()) - .or_insert_with(Vec::new) - .push(output); - } + let export_stats = config + .statistics + .get(&output.export_id) + .expect("statistics have been intialized") + .clone(); + export_statistics + .entry(output.table_name.clone()) + .or_insert_with(Vec::new) + .push(export_stats); + tables_to_snapshot + .entry(output.table_name.clone()) + .or_insert_with(Vec::new) + .push(output); } let (button, transient_errors): (_, StreamVec<'scope, GtidPartition, Rc>) = builder.build_fallible(move |caps| { let busy_signal = Arc::clone(&config.busy_signal); Box::pin(SignaledFuture::new(busy_signal, async move { - let [data_cap_set, rewind_cap_set, definite_error_cap_set]: &mut [_; 3] = - caps.try_into().unwrap(); + let [ + data_cap_set, + rewind_cap_set, + definite_error_cap_set, + snapshot_cap_set, + ]: &mut [_; 4] = caps.try_into().unwrap(); let id = config.id; let worker_id = config.worker_id; + // Initialize statistics for all exports (required even + // if this worker won't snapshot anything). if !all_outputs.is_empty() { - // A worker *must* emit a count even if not responsible for snapshotting a table - // as statistic summarization will return null if any worker hasn't set a value. - // This will also reset snapshot stats for any exports not snapshotting. for statistics in config.statistics.values() { statistics.set_snapshot_records_known(0); statistics.set_snapshot_records_staged(0); } } - // If this worker has no tables to snapshot then there is nothing to do. - if reader_snapshot_table_info.is_empty() { - trace!(%id, "timely-{worker_id} initializing table reader \ - with no tables to snapshot, exiting"); + if tables_to_snapshot.is_empty() { + trace!(%id, "timely-{worker_id} no tables to snapshot, exiting"); return Ok(()); - } else { - trace!(%id, "timely-{worker_id} initializing table reader \ - with {} tables to snapshot", - reader_snapshot_table_info.len()); } + trace!(%id, "timely-{worker_id} snapshotting {} tables", + tables_to_snapshot.len()); + let connection_config = connection .connection .config( @@ -220,73 +342,189 @@ pub(crate) fn render<'scope>( .await?; let task_name = format!("timely-{worker_id} MySQL snapshotter"); - let lock_clauses = reader_snapshot_table_info - .keys() - .map(|t| format!("{} READ", t)) - .collect::>() - .join(", "); - let mut lock_conn = connection_config - .connect( - &task_name, - &config.config.connection_context.ssh_tunnel_manager, - ) - .await?; - if let Some(timeout) = config - .config - .parameters - .mysql_source_timeouts - .snapshot_lock_wait_timeout - { - // Interpolating a `Duration` integer; not parameterizable in MySQL `SET`. - #[allow(clippy::disallowed_methods)] - lock_conn - .query_drop(format!( - "SET @@session.lock_wait_timeout = {}", - timeout.as_secs() - )) - .await?; - } + // Phase B: Leader acquires locks, reads GTID, queries PK bounds, broadcasts. + // + // All fallible leader work is wrapped in an inner async block so that + // we ALWAYS broadcast a result (success or failure) before returning. + // Without this, a leader error would drop `snapshot_cap_set` without + // broadcasting, causing non-leader workers to deadlock waiting for + // `SnapshotInfo` on the feedback loop. + let mut lock_conn = if is_snapshot_leader { + let leader_result: Result<_, LeaderError> = async { + let lock_clauses = tables_to_snapshot + .keys() + .map(|t| format!("{} READ", t)) + .collect::>() + .join(", "); + let mut lock_conn = connection_config + .connect( + &task_name, + &config.config.connection_context.ssh_tunnel_manager, + ) + .await?; + if let Some(timeout) = config + .config + .parameters + .mysql_source_timeouts + .snapshot_lock_wait_timeout + { + // Interpolating a `Duration` integer; not parameterizable in MySQL `SET`. + #[allow(clippy::disallowed_methods)] + lock_conn + .query_drop(format!( + "SET @@session.lock_wait_timeout = {}", + timeout.as_secs() + )) + .await?; + } + + trace!(%id, "timely-{worker_id} acquiring table locks: {lock_clauses}"); + // `lock_clauses` is built from `MySqlTableName::Display`, which + // escapes both schema and table via `quote_identifier`. + #[allow(clippy::disallowed_methods)] + let lock_result = lock_conn + .query_drop(format!("LOCK TABLES {lock_clauses}")) + .await; + match lock_result { + Err(mysql_async::Error::Server(mysql_async::ServerError { + code, + message, + .. + })) if code == ER_NO_SUCH_TABLE => { + trace!(%id, "timely-{worker_id} received unknown table error from \ + lock query"); + return Err(LeaderError::Definite(DefiniteError::TableDropped( + message, + ))); + } + e => e?, + }; + + // Record the frontier of future GTIDs based on the executed GTID set + // at the start of the snapshot + let snapshot_gtid_set = + query_sys_var(&mut lock_conn, "global.gtid_executed").await?; + + trace!(%id, "timely-{worker_id} acquired table locks"); + + // Query PK bounds for each table + let mut pk_bounds_map = BTreeMap::new(); + for (table, outputs) in &tables_to_snapshot { + let desc = &outputs[0].desc; + if let Some(pk_col_name) = integer_pk_col(desc) { + let quoted = quote_identifier(pk_col_name); + let query = format!( + "SELECT MIN({}) AS pk_min, MAX({}) AS pk_max FROM {}", + quoted, quoted, table + ); + // `quoted` and `table` are escaped via `quote_identifier` / + // `MySqlTableName::Display`, so this interpolation is safe; + // not parameterizable. + #[allow(clippy::disallowed_methods)] + let row: Option<( + Option, + Option, + )> = lock_conn.query_first(query).await?; + match row { + Some((Some(min_val), Some(max_val))) + if i128::from(max_val) - i128::from(min_val) + 1 + >= i128::cast_from(config.worker_count) => + { + pk_bounds_map.insert( + table.clone(), + Some(PkBounds { + pk_col: quoted, + min_val, + max_val, + }), + ); + } + _ => { + // Table is empty, too small to + // parallelize, or has NULL PKs. + pk_bounds_map.insert(table.clone(), None); + } + } + } else { + pk_bounds_map.insert(table.clone(), None); + } + } - trace!(%id, "timely-{worker_id} acquiring table locks: {lock_clauses}"); - // `lock_clauses` is built from `MySqlTableName::Display`, which - // escapes both schema and table via `quote_identifier`. - #[allow(clippy::disallowed_methods)] - let lock_result = lock_conn - .query_drop(format!("LOCK TABLES {lock_clauses}")) + let snapshot_info = SnapshotInfo { + gtid_set: snapshot_gtid_set, + pk_bounds: pk_bounds_map, + }; + Ok((snapshot_info, lock_conn)) + } .await; - match lock_result { - // Handle the case where a table we are snapshotting has been dropped or renamed. - Err(mysql_async::Error::Server(mysql_async::ServerError { - code, - message, - .. - })) if code == ER_NO_SUCH_TABLE => { - trace!(%id, "timely-{worker_id} received unknown table error from \ - lock query"); - let err = DefiniteError::TableDropped(message); - return Ok(return_definite_error( - err, - &all_outputs, - &raw_handle, - data_cap_set, - &definite_error_handle, - definite_error_cap_set, - ) - .await); + + match leader_result { + Ok((info, conn)) => { + trace!(%id, "timely-{worker_id} broadcasting snapshot info: {info:?}"); + snapshot_handle.give(&snapshot_cap_set[0], Some(info)); + Some(conn) + } + Err(err) => { + // CRITICAL: broadcast None so non-leaders exit cleanly + // instead of deadlocking on the feedback loop. + trace!(%id, "timely-{worker_id} leader failed, broadcasting \ + error sentinel"); + snapshot_handle.give(&snapshot_cap_set[0], None); + match err { + LeaderError::Definite(e) => { + return Ok(return_definite_error( + e, + &all_outputs, + &raw_handle, + data_cap_set, + &definite_error_handle, + definite_error_cap_set, + ) + .await); + } + LeaderError::Transient(e) => { + return Err(e); + } + } + } + } + } else { + None + }; + + // Phase C: All workers receive broadcast. + // The payload is `Option`: `Some` on success, + // `None` if the leader encountered an error. + let snapshot_info: Option = 'recv: loop { + match snapshot_input.next().await { + Some(AsyncEvent::Data(_, mut data)) => { + if let Some(msg) = data.pop() { + break 'recv msg; + } + } + Some(AsyncEvent::Progress(_)) => continue, + None => { + // Feedback stream closed without data — the leader + // must have failed. Return cleanly; the leader's + // operator instance handles error propagation. + break 'recv None; + } + } + }; + let snapshot_info = match snapshot_info { + Some(info) => info, + None => { + // Leader signaled failure. Bail out — errors are + // already propagated by the leader's worker. + return Ok(()); } - e => e?, }; - // Record the frontier of future GTIDs based on the executed GTID set at the start - // of the snapshot - let snapshot_gtid_set = - query_sys_var(&mut lock_conn, "global.gtid_executed").await?; - let snapshot_gtid_frontier = match gtid_set_frontier(&snapshot_gtid_set) { + // Parse GTID frontier from snapshot_info.gtid_set + let snapshot_gtid_frontier = match gtid_set_frontier(&snapshot_info.gtid_set) { Ok(frontier) => frontier, Err(err) => { let err = DefiniteError::UnsupportedGtidState(err.to_string()); - // If we received a GTID Set with non-consecutive intervals this breaks all - // our assumptions, so there is nothing else we can do. return Ok(return_definite_error( err, &all_outputs, @@ -299,127 +537,188 @@ pub(crate) fn render<'scope>( } }; - // TODO(roshan): Insert metric for how long it took to acquire the locks - trace!(%id, "timely-{worker_id} acquired table locks at: {}", + trace!(%id, "timely-{worker_id} received snapshot info at: {}", snapshot_gtid_frontier.pretty()); - let mut conn = connection_config - .connect( - &task_name, - &config.config.connection_context.ssh_tunnel_manager, - ) - .await?; + // Precompute each table's read plan for this worker. + // Tables with PK bounds get a range; others use + // single-worker fallback via responsible_for. + let table_ranges: BTreeMap<_, _> = tables_to_snapshot + .keys() + .filter_map(|table| { + let range = match snapshot_info.pk_bounds.get(table) { + Some(Some(bounds)) => { + worker_pk_range(bounds, config.worker_id, config.worker_count) + } + _ => None, + }; + let should_read = range.is_some() || config.responsible_for(table); + should_read.then(|| (table.clone(), range)) + }) + .collect(); + let has_work = !table_ranges.is_empty(); - // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs - match validate_mysql_repl_settings(&mut conn).await { - Err(err @ MySqlError::InvalidSystemSetting { .. }) => { - return Ok(return_definite_error( - DefiniteError::ServerConfigurationError(err.to_string()), - &all_outputs, - &raw_handle, - data_cap_set, - &definite_error_handle, - definite_error_cap_set, + // Non-leader workers that have nothing to read skip + // connecting — avoids exhausting MySQL's connection + // pool in many-sources scenarios (limits test). + let mut conn = if is_snapshot_leader || has_work { + let mut c = connection_config + .connect( + &task_name, + &config.config.connection_context.ssh_tunnel_manager, ) - .await); + .await?; + match validate_mysql_repl_settings(&mut c).await { + Err(err @ MySqlError::InvalidSystemSetting { .. }) => { + return Ok(return_definite_error( + DefiniteError::ServerConfigurationError(err.to_string()), + &all_outputs, + &raw_handle, + data_cap_set, + &definite_error_handle, + definite_error_cap_set, + ) + .await); + } + Err(err) => Err(err)?, + Ok(()) => (), + }; + Some(c) + } else { + trace!(%id, "timely-{worker_id} has no tables to read, \ + skipping MySQL connection"); + None + }; + + let mut tx = if let Some(ref mut conn) = conn { + trace!(%id, "timely-{worker_id} starting transaction with \ + consistent snapshot at: {}", snapshot_gtid_frontier.pretty()); + let mut tx_opts = TxOpts::default(); + tx_opts + .with_isolation_level(IsolationLevel::RepeatableRead) + .with_consistent_snapshot(true) + .with_readonly(true); + let mut tx = conn.start_transaction(tx_opts).await?; + // Set the session time zone to UTC so we read TIMESTAMP columns as UTC. + #[allow(clippy::disallowed_methods)] // static SQL string + tx.query_drop("set @@session.time_zone = '+00:00'").await?; + if let Some(timeout) = config + .config + .parameters + .mysql_source_timeouts + .snapshot_max_execution_time + { + // Interpolating an integer millis value; not parameterizable in MySQL `SET`. + #[allow(clippy::disallowed_methods)] + tx.query_drop(format!( + "SET @@session.max_execution_time = {}", + timeout.as_millis() + )) + .await?; } - Err(err) => Err(err)?, - Ok(()) => (), + Some(tx) + } else { + None }; - trace!(%id, "timely-{worker_id} starting transaction with \ - consistent snapshot at: {}", snapshot_gtid_frontier.pretty()); - - // Start a transaction with REPEATABLE READ and 'CONSISTENT SNAPSHOT' semantics - // so we can read a consistent snapshot of the table at the specific GTID we read. - let mut tx_opts = TxOpts::default(); - tx_opts - .with_isolation_level(IsolationLevel::RepeatableRead) - .with_consistent_snapshot(true) - .with_readonly(true); - let mut tx = conn.start_transaction(tx_opts).await?; - // Set the session time zone to UTC so that we can read TIMESTAMP columns as UTC - // From https://dev.mysql.com/doc/refman/8.0/en/datetime.html: "MySQL converts TIMESTAMP values - // from the current time zone to UTC for storage, and back from UTC to the current time zone - // for retrieval. (This does not occur for other types such as DATETIME.)" - #[allow(clippy::disallowed_methods)] // static SQL string - tx.query_drop("set @@session.time_zone = '+00:00'").await?; - - // Configure query execution time based on param. We want to be able to - // override the server value here in case it's set too low, - // respective to the size of the data we need to copy. - if let Some(timeout) = config - .config - .parameters - .mysql_source_timeouts - .snapshot_max_execution_time - { - // Interpolating an integer millis value; not parameterizable in MySQL `SET`. - #[allow(clippy::disallowed_methods)] - tx.query_drop(format!( - "SET @@session.max_execution_time = {}", - timeout.as_millis() - )) - .await?; + // Phase E: All workers signal, leader unlocks + *snapshot_cap_set = CapabilitySet::new(); + if is_snapshot_leader { + while snapshot_input.next().await.is_some() {} + if let Some(mut lc) = lock_conn.take() { + #[allow(clippy::disallowed_methods)] // static SQL string + lc.query_drop("UNLOCK TABLES").await?; + lc.disconnect().await?; + } } + drop(lock_conn); - // We have started our transaction so we can unlock the tables. - #[allow(clippy::disallowed_methods)] // static SQL string - lock_conn.query_drop("UNLOCK TABLES").await?; - lock_conn.disconnect().await?; + trace!(%id, "timely-{worker_id} started transaction (has_work={has_work})"); - trace!(%id, "timely-{worker_id} started transaction"); + // Workers without a transaction have nothing to read. + let Some(ref mut tx) = tx else { + return Ok(()); + }; - // Verify the schemas of the tables we are snapshotting - let errored_outputs = - verify_schemas(&mut tx, reader_snapshot_table_info.iter().collect()).await?; + // Phase F: Verify schemas for the tables this worker reads. In + // PK-range mode several workers read the same table, so each + // verifies independently to learn which outputs to skip; only + // the responsible worker publishes any resulting error (below). + let errored_outputs = verify_schemas( + tx, + tables_to_snapshot + .iter() + .filter(|(t, _)| table_ranges.contains_key(t)) + .map(|(k, v)| (k, v.as_slice())) + .collect(), + ) + .await?; let mut removed_outputs = BTreeSet::new(); for (output, err) in errored_outputs { - // Publish the error for this table and stop ingesting it - let update = ( - (output.output_index, Err(err.clone().into())), - GtidPartition::minimum(), - Diff::ONE, - ); - let size = update.fuel_size(); - raw_handle.give_fueled(&data_cap_set[0], update, size).await; - tracing::warn!(%id, "timely-{worker_id} stopping snapshot of output {output:?} \ - due to schema mismatch"); + // Every worker reading this table must stop ingesting the output, but + // only the responsible worker publishes the error so it lands with + // multiplicity one — other workers read disjoint PK ranges of the same + // table and would otherwise each emit the same error. + if config.responsible_for(&output.table_name) { + let update = ( + (output.output_index, Err(err.clone().into())), + GtidPartition::minimum(), + Diff::ONE, + ); + let size = update.fuel_size(); + raw_handle.give_fueled(&data_cap_set[0], update, size).await; + tracing::warn!(%id, "timely-{worker_id} stopping snapshot of output \ + {output:?} due to schema mismatch"); + } removed_outputs.insert(output.output_index); } - for (_, outputs) in reader_snapshot_table_info.iter_mut() { + for (_, outputs) in tables_to_snapshot.iter_mut() { outputs.retain(|output| !removed_outputs.contains(&output.output_index)); } - reader_snapshot_table_info.retain(|_, outputs| !outputs.is_empty()); + tables_to_snapshot.retain(|_, outputs| !outputs.is_empty()); - let snapshot_total = fetch_snapshot_size( - &mut tx, - reader_snapshot_table_info - .iter() - .map(|(name, outputs)| { - ( - name.clone(), - outputs.len(), - export_statistics.get(name).unwrap(), - ) - }) - .collect(), - metrics, - ) - .await?; + // Phase G: The leader publishes the full snapshot size so the summed + // worker-local gauges reflect the upstream total without double-counting. + if is_snapshot_leader { + fetch_snapshot_size( + tx, + tables_to_snapshot + .iter() + .map(|(name, outputs)| { + let stats = export_statistics + .get(name) + .expect("statistics are initialized for each output"); + (name.clone(), outputs.len(), stats) + }) + .collect(), + metrics, + ) + .await?; + } - // This worker has nothing else to do - if reader_snapshot_table_info.is_empty() { + if tables_to_snapshot.is_empty() { return Ok(()); } - // Read the snapshot data from the tables + // Phase H: Read snapshot data let mut final_row = Row::default(); + // Yield more frequently when multiple workers are + // active to keep total in-flight memory bounded. + // ~130 bytes/row × 10K rows ≈ 1.3 MiB per yield. + // The trailing `.max(1)` keeps the interval positive even for + // pathological worker counts (> 10K), avoiding a `% 0` panic. + let yield_interval = (10_000 / u64::cast_from(config.worker_count).max(1)).max(1); + let mut snapshot_staged_total = 0; - for (table, outputs) in &reader_snapshot_table_info { + for (table, outputs) in &tables_to_snapshot { + let pk_range = match table_ranges.get(table) { + Some(range) => range.as_ref(), + None => continue, // This worker has no work for this table + }; + let mut snapshot_staged = 0; - let query = build_snapshot_query(outputs); + let query = build_snapshot_query(outputs, pk_range); trace!(%id, "timely-{worker_id} reading snapshot query='{}'", query); let mut results = tx.exec_stream(query, ()).await?; while let Some(row) = results.try_next().await? { @@ -457,28 +756,39 @@ pub(crate) fn render<'scope>( let size = update.fuel_size(); raw_handle.give_fueled(&data_cap_set[0], update, size).await; } - // This overcounting maintains existing behavior but will be removed one readers no longer rely on the value. snapshot_staged_total += u64::cast_from(outputs.len()); + if snapshot_staged_total % yield_interval == 0 { + tokio::task::yield_now().await; + } if snapshot_staged_total % 1000 == 0 { - for statistics in export_statistics.get(table).unwrap() { - statistics.set_snapshot_records_staged(snapshot_staged); + if let Some(stats_list) = export_statistics.get(table) { + for statistics in stats_list { + statistics.set_snapshot_records_staged(snapshot_staged); + } } } } - for statistics in export_statistics.get(table).unwrap() { - statistics.set_snapshot_records_staged(snapshot_staged); + if let Some(stats_list) = export_statistics.get(table) { + for statistics in stats_list { + statistics.set_snapshot_records_staged(snapshot_staged); + } } trace!(%id, "timely-{worker_id} snapshotted {} records from \ table '{table}'", snapshot_staged * u64::cast_from(outputs.len())); } + // Phase I: Emit rewind requests // We are done with the snapshot so now we will emit rewind requests. It is // important that this happens after the snapshot has finished because this is what - // unblocks the replication operator and we want this to happen serially. It might - // seem like a good idea to read the replication stream concurrently with the - // snapshot but it actually leads to a lot of data being staged for the future, - // which needlesly consumed memory in the cluster. - for (table, outputs) in reader_snapshot_table_info { + // unblocks the replication operator and we want this to happen serially. + // + // Only the responsible worker emits rewind requests + // for each table. This worker is guaranteed to have + // a transaction (responsible_for → has_work = true). + for (table, outputs) in &tables_to_snapshot { + if !config.responsible_for(table) { + continue; + } for output in outputs { trace!(%id, "timely-{worker_id} producing rewind request for {table}\ output {}", output.output_index); @@ -491,12 +801,6 @@ pub(crate) fn render<'scope>( } *rewind_cap_set = CapabilitySet::new(); - // TODO (maz): Should we remove this to match Postgres? - if snapshot_staged_total < snapshot_total { - error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow \ - bigger than records staged {snapshot_staged_total}"); - } - Ok(()) })) }); @@ -540,8 +844,10 @@ where /// /// Expect `outputs` to contain entries for a single table, and to have at least 1 entry. /// Expect that each MySqlTableDesc entry contains all columns described in information_schema.columns. +/// +/// When `pk_range` is provided, a WHERE clause is appended to filter rows by PK range. #[must_use] -fn build_snapshot_query(outputs: &[SourceOutputInfo]) -> String { +fn build_snapshot_query(outputs: &[SourceOutputInfo], pk_range: Option<&PkRange>) -> String { let info = outputs.first().expect("MySQL table info"); for output in &outputs[1..] { // the columns may be decoded based on position, and different outputs may replicate @@ -558,7 +864,14 @@ fn build_snapshot_query(outputs: &[SourceOutputInfo]) -> String { .iter() .map(|col| quote_identifier(&col.name)) .join(", "); - format!("SELECT {} FROM {}", columns, info.table_name) + let mut query = format!("SELECT {} FROM {}", columns, info.table_name); + if let Some(range) = pk_range { + query.push_str(&format!(" WHERE {} >= {}", range.pk_col, range.lower)); + if let Some(upper) = range.upper { + query.push_str(&format!(" AND {} < {}", range.pk_col, upper)); + } + } + query } #[derive(Default)] @@ -625,7 +938,7 @@ mod tests { export_id: mz_repr::GlobalId::User(1), binlog_full_metadata: false, }; - let query = build_snapshot_query(&[info.clone(), info]); + let query = build_snapshot_query(&[info.clone(), info], None); assert_eq!( format!( "SELECT `c1`, `c2`, `c3` FROM `{}`.`{}`", @@ -634,4 +947,210 @@ mod tests { query ); } + + #[mz_ore::test] + fn snapshot_query_with_pk_range() { + let schema_name = "myschema".to_string(); + let table_name = "mytable".to_string(); + let table = MySqlTableName(schema_name.clone(), table_name.clone()); + let columns = ["id", "name"] + .iter() + .map(|col| MySqlColumnDesc { + name: col.to_string(), + column_type: None, + meta: None, + }) + .collect::>(); + let desc = MySqlTableDesc { + schema_name: schema_name.clone(), + name: table_name.clone(), + columns, + keys: BTreeSet::default(), + }; + let info = SourceOutputInfo { + output_index: 1, + table_name: table.clone(), + desc, + text_columns: vec![], + exclude_columns: vec![], + initial_gtid_set: Antichain::default(), + resume_upper: Antichain::default(), + export_id: mz_repr::GlobalId::User(1), + binlog_full_metadata: false, + }; + + // Bounded range + let range = PkRange { + pk_col: "`id`".to_string(), + lower: 100, + upper: Some(200), + }; + let query = build_snapshot_query(std::slice::from_ref(&info), Some(&range)); + assert_eq!( + format!( + "SELECT `id`, `name` FROM `{}`.`{}` WHERE `id` >= 100 AND `id` < 200", + &schema_name, &table_name + ), + query + ); + + // Open-ended range (last worker) + let range = PkRange { + pk_col: "`id`".to_string(), + lower: 200, + upper: None, + }; + let query = build_snapshot_query(std::slice::from_ref(&info), Some(&range)); + assert_eq!( + format!( + "SELECT `id`, `name` FROM `{}`.`{}` WHERE `id` >= 200", + &schema_name, &table_name + ), + query + ); + } + + #[mz_ore::test] + fn test_worker_pk_range() { + let bounds = PkBounds { + pk_col: "`id`".to_string(), + min_val: 1, + max_val: 100, + }; + + // 2 workers, range_size = 100 + let r0 = worker_pk_range(&bounds, 0, 2).expect("worker 0 should have range"); + assert_eq!(r0.lower, 1); + assert_eq!(r0.upper, Some(51)); + + let r1 = worker_pk_range(&bounds, 1, 2).expect("worker 1 should have range"); + assert_eq!(r1.lower, 51); + assert!(r1.upper.is_none()); // last worker + + // More workers than range + let small_bounds = PkBounds { + pk_col: "`id`".to_string(), + min_val: 1, + max_val: 2, + }; + // range_size = 2, effective = 2 + let r0 = worker_pk_range(&small_bounds, 0, 10).expect("worker 0 should have range"); + assert_eq!(r0.lower, 1); + assert_eq!(r0.upper, Some(2)); + let r1 = worker_pk_range(&small_bounds, 1, 10).expect("worker 1 should have range"); + assert_eq!(r1.lower, 2); + assert!(r1.upper.is_none()); + // Workers beyond effective count get nothing + assert!(worker_pk_range(&small_bounds, 2, 10).is_none()); + + // Large Int64 range — would overflow with i64 arithmetic + let large_bounds = PkBounds { + pk_col: "`id`".to_string(), + min_val: 0, + max_val: i64::MAX, + }; + // With 4 workers this should not panic or wrap + let r0 = worker_pk_range(&large_bounds, 0, 4).expect("worker 0"); + assert_eq!(r0.lower, 0); + let r3 = worker_pk_range(&large_bounds, 3, 4).expect("worker 3"); + assert!(r3.upper.is_none()); // last worker, open-ended + // Ranges should be contiguous: each worker's start == previous worker's end + let r1 = worker_pk_range(&large_bounds, 1, 4).expect("worker 1"); + let r2 = worker_pk_range(&large_bounds, 2, 4).expect("worker 2"); + assert_eq!(r0.upper, Some(r1.lower)); + assert_eq!(r1.upper, Some(r2.lower)); + assert_eq!(r2.upper, Some(r3.lower)); + } + + #[mz_ore::test] + fn test_integer_pk_col() { + use mz_mysql_util::MySqlKeyDesc; + use mz_repr::SqlColumnType; + + // Single-column INT32 PK + let desc = MySqlTableDesc { + schema_name: "s".to_string(), + name: "t".to_string(), + columns: vec![MySqlColumnDesc { + name: "id".to_string(), + column_type: Some(SqlColumnType { + scalar_type: SqlScalarType::Int32, + nullable: false, + }), + meta: None, + }], + keys: BTreeSet::from([MySqlKeyDesc { + name: "PRIMARY".to_string(), + is_primary: true, + columns: vec!["id".to_string()], + }]), + }; + assert_eq!(integer_pk_col(&desc), Some("id")); + + // UInt64 PK should be excluded + let desc_u64 = MySqlTableDesc { + schema_name: "s".to_string(), + name: "t".to_string(), + columns: vec![MySqlColumnDesc { + name: "id".to_string(), + column_type: Some(SqlColumnType { + scalar_type: SqlScalarType::UInt64, + nullable: false, + }), + meta: None, + }], + keys: BTreeSet::from([MySqlKeyDesc { + name: "PRIMARY".to_string(), + is_primary: true, + columns: vec!["id".to_string()], + }]), + }; + assert_eq!(integer_pk_col(&desc_u64), None); + + // Multi-column PK should be excluded + let desc_multi = MySqlTableDesc { + schema_name: "s".to_string(), + name: "t".to_string(), + columns: vec![ + MySqlColumnDesc { + name: "a".to_string(), + column_type: Some(SqlColumnType { + scalar_type: SqlScalarType::Int32, + nullable: false, + }), + meta: None, + }, + MySqlColumnDesc { + name: "b".to_string(), + column_type: Some(SqlColumnType { + scalar_type: SqlScalarType::Int32, + nullable: false, + }), + meta: None, + }, + ], + keys: BTreeSet::from([MySqlKeyDesc { + name: "PRIMARY".to_string(), + is_primary: true, + columns: vec!["a".to_string(), "b".to_string()], + }]), + }; + assert_eq!(integer_pk_col(&desc_multi), None); + + // No PK + let desc_no_pk = MySqlTableDesc { + schema_name: "s".to_string(), + name: "t".to_string(), + columns: vec![MySqlColumnDesc { + name: "id".to_string(), + column_type: Some(SqlColumnType { + scalar_type: SqlScalarType::Int32, + nullable: false, + }), + meta: None, + }], + keys: BTreeSet::default(), + }; + assert_eq!(integer_pk_col(&desc_no_pk), None); + } } diff --git a/test/mysql-cdc/mysql-cdc.td b/test/mysql-cdc/mysql-cdc.td index 1768d73a1df88..3f70d0710cf45 100644 --- a/test/mysql-cdc/mysql-cdc.td +++ b/test/mysql-cdc/mysql-cdc.td @@ -556,6 +556,49 @@ true > DROP SOURCE large_cluster_source CASCADE; +# +# Verify parallel PK-range snapshot correctness with multiple workers. +# With a single-column integer PK and enough rows, each worker reads a +# disjoint PK range. We verify no rows are lost or duplicated. +# + +$ mysql-execute name=mysql +DROP TABLE IF EXISTS pk_range_test; +CREATE TABLE pk_range_test (id BIGINT PRIMARY KEY, val BIGINT); +SET @i := 0; +INSERT INTO pk_range_test SELECT @i := @i + 1, @i * 7 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT 1000; + +> CREATE CLUSTER pk_range_cluster SIZE 'scale=1,workers=4' + +> BEGIN +> CREATE SOURCE pk_range_source + IN CLUSTER pk_range_cluster + FROM MYSQL CONNECTION mysql_conn; +> CREATE TABLE pk_range_table FROM SOURCE pk_range_source (REFERENCE public.pk_range_test); +> COMMIT + +# Exact row count +> SELECT COUNT(*) FROM pk_range_table; +1000 + +# No duplicates: distinct count must equal total count +> SELECT COUNT(DISTINCT id) = COUNT(*) FROM pk_range_table; +true + +# Verify PK boundaries — all rows from 1..1000 are present +> SELECT MIN(id), MAX(id) FROM pk_range_table; +1 1000 + +# Checksum: val = id * 7, so SUM(val) = 7 * SUM(1..1000) = 7 * 500500 +> SELECT SUM(val) FROM pk_range_table; +3503500 + +> DROP SOURCE pk_range_source CASCADE; +> DROP CLUSTER pk_range_cluster CASCADE; + +$ mysql-execute name=mysql +DROP TABLE pk_range_test; + # # Remove all data on the Postgres side #