diff --git a/ci/test/pipeline.template.yml b/ci/test/pipeline.template.yml index a42124fce4e41..98c428b7c4881 100644 --- a/ci/test/pipeline.template.yml +++ b/ci/test/pipeline.template.yml @@ -393,6 +393,17 @@ steps: agents: queue: hetzner-aarch64-4cpu-8gb + - id: pg-cdc-standby + label: Postgres CDC against a physical standby + depends_on: build-aarch64 + timeout_in_minutes: 30 + inputs: [test/pg-cdc-standby] + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-standby + agents: + queue: hetzner-aarch64-8cpu-16gb + - group: Connection key: connection-tests steps: diff --git a/src/postgres-util/src/replication.rs b/src/postgres-util/src/replication.rs index eb09f6c7723a0..3207210624a90 100644 --- a/src/postgres-util/src/replication.rs +++ b/src/postgres-util/src/replication.rs @@ -232,7 +232,17 @@ pub async fn get_timeline_id(client: &Client) -> Result { } pub async fn get_current_wal_lsn(client: &Client) -> Result { - let row = client.query_one("SELECT pg_current_wal_lsn()", &[]).await?; + // `pg_current_wal_lsn()` errors with "recovery is in progress" on a physical + // standby. When reading from a standby (logical decoding on standby, PG 16+) + // the relevant frontier is how far WAL has been replayed locally, which is + // exactly what `pg_last_wal_replay_lsn()` reports. + let row = client + .query_one( + "SELECT CASE WHEN pg_is_in_recovery() \ + THEN pg_last_wal_replay_lsn() ELSE pg_current_wal_lsn() END", + &[], + ) + .await?; let lsn: PgLsn = row.get(0); Ok(lsn) diff --git a/src/storage/src/source/postgres.rs b/src/storage/src/source/postgres.rs index a8986980a8ec0..cffa09c6cd4bc 100644 --- a/src/storage/src/source/postgres.rs +++ b/src/storage/src/source/postgres.rs @@ -447,22 +447,23 @@ async fn fetch_slot_metadata( } } -/// Fetch the `pg_current_wal_lsn`, used to report metrics. +/// Fetch the upstream WAL frontier, used to report metrics. async fn fetch_max_lsn(client: &Client) -> Result { - let query = "SELECT pg_current_wal_lsn()"; + // `pg_current_wal_lsn()` errors with "recovery is in progress" on a physical + // standby; fall back to the local replay frontier there. See + // `mz_postgres_util::get_current_wal_lsn` for the rationale. + let query = "SELECT CASE WHEN pg_is_in_recovery() \ + THEN pg_last_wal_replay_lsn() ELSE pg_current_wal_lsn() END AS wal_lsn"; let row = simple_query_opt(client, query).await?; - match row.and_then(|row| { - row.get("pg_current_wal_lsn") - .map(|lsn| lsn.parse::().unwrap()) - }) { + match row.and_then(|row| row.get("wal_lsn").map(|lsn| lsn.parse::().unwrap())) { // Based on the documentation, it appears that `pg_current_wal_lsn` has // the same "upper" semantics of `confirmed_flush_lsn`: // // We may need to revisit this and use `pg_current_wal_flush_lsn`. Some(lsn) => Ok(MzOffset::from(lsn)), None => Err(TransientError::Generic(anyhow::anyhow!( - "pg_current_wal_lsn() mysteriously has no value" + "WAL LSN mysteriously has no value" ))), } } diff --git a/test/pg-cdc-standby/configure-materialize.td b/test/pg-cdc-standby/configure-materialize.td new file mode 100644 index 0000000000000..500ce16b44e3a --- /dev/null +++ b/test/pg-cdc-standby/configure-materialize.td @@ -0,0 +1,31 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Point Materialize at the *standby*, not the primary. Slot creation and the +# initial snapshot both create logical slots on the standby, which block until +# the primary emits a standby-snapshot record -- the mzcompose workflow nudges +# the primary in the background while this runs. + +> DROP SECRET IF EXISTS pgpass CASCADE; +> DROP CONNECTION IF EXISTS pg CASCADE; + +> CREATE SECRET pgpass AS 'postgres' + +> CREATE CONNECTION pg TO POSTGRES ( + HOST 'pg-standby', + DATABASE postgres, + USER postgres, + PASSWORD SECRET pgpass + ) + +> CREATE SOURCE mz_source + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') + +> CREATE TABLE t FROM SOURCE mz_source (REFERENCE t) +> CREATE TABLE small FROM SOURCE mz_source (REFERENCE small) diff --git a/test/pg-cdc-standby/configure-primary.td b/test/pg-cdc-standby/configure-primary.td new file mode 100644 index 0000000000000..7818acc51c8b5 --- /dev/null +++ b/test/pg-cdc-standby/configure-primary.td @@ -0,0 +1,27 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Seed the primary. The standby is cloned from it afterwards, so these objects +# (and the postgres role's REPLICATION attribute, which it has by default) +# arrive on the standby via the physical base backup. + +$ postgres-execute connection=postgres://postgres:postgres@pg-primary +DROP TABLE IF EXISTS t; +CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT); +ALTER TABLE t REPLICA IDENTITY FULL; +INSERT INTO t SELECT g, 'v' || g FROM generate_series(1, 1000) AS g; + +DROP TABLE IF EXISTS small; +CREATE TABLE small (id INTEGER PRIMARY KEY, v TEXT); +ALTER TABLE small REPLICA IDENTITY FULL; +INSERT INTO small VALUES (1, 'a'), (2, 'b'), (3, 'c'); + +DROP PUBLICATION IF EXISTS mz_source; +CREATE PUBLICATION mz_source FOR ALL TABLES; +ANALYZE; diff --git a/test/pg-cdc-standby/insert-update-delete.td b/test/pg-cdc-standby/insert-update-delete.td new file mode 100644 index 0000000000000..bbd0d734d0de7 --- /dev/null +++ b/test/pg-cdc-standby/insert-update-delete.td @@ -0,0 +1,20 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# All DML happens on the PRIMARY. It must reach Materialize via: +# primary --(physical streaming replication)--> standby --(logical decoding)--> mz + +$ postgres-execute connection=postgres://postgres:postgres@pg-primary +INSERT INTO t SELECT g, 'v' || g FROM generate_series(1001, 1500) AS g; +DELETE FROM t WHERE id <= 250; +UPDATE t SET v = 'updated' WHERE id = 1000; + +INSERT INTO small VALUES (4, 'd'); +DELETE FROM small WHERE id = 1; +UPDATE small SET v = 'B' WHERE id = 2; diff --git a/test/pg-cdc-standby/mzcompose b/test/pg-cdc-standby/mzcompose new file mode 100755 index 0000000000000..1f866645dabc8 --- /dev/null +++ b/test/pg-cdc-standby/mzcompose @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +# +# mzcompose — runs Docker Compose with Materialize customizations. + +exec "$(dirname "$0")"/../../bin/pyactivate -m materialize.cli.mzcompose "$@" diff --git a/test/pg-cdc-standby/mzcompose.py b/test/pg-cdc-standby/mzcompose.py new file mode 100644 index 0000000000000..bfe052af2d949 --- /dev/null +++ b/test/pg-cdc-standby/mzcompose.py @@ -0,0 +1,211 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +Postgres source against a physical *standby*. + +Exercises "logical decoding on standby" (PostgreSQL 16+): a primary streams its +WAL to a physical replica via streaming replication, and Materialize creates a +logical replication slot on the *replica* and decodes changes there. Data +originates on the primary, flows to the standby physically, and is decoded +logically for Materialize -- the primary is never touched by Materialize. + +The notable wrinkle is that creating a logical slot on a standby blocks until +the primary emits a standby-snapshot (RUNNING_XACTS) WAL record. Materialize +creates its slots synchronously, so without help the source would hang. We run +a background thread that calls pg_log_standby_snapshot() on the primary to +unblock slot creation; see nudge_standby_snapshots(). +""" + +import threading +from contextlib import contextmanager +from textwrap import dedent +from typing import Iterator + +import psycopg + +from materialize.mzcompose.composition import Composition +from materialize.mzcompose.services.materialized import Materialized +from materialize.mzcompose.services.mz import Mz +from materialize.mzcompose.services.postgres import Postgres +from materialize.mzcompose.services.testdrive import Testdrive + +# Standby's data directory is seeded by pg_basebackup before postgres starts, so +# it needs to survive between the `run` (basebackup) and `up` (serve) steps. +VOLUMES = {"standbydata": None} + +SERVICES = [ + Mz(app_password=""), + Materialized( + additional_system_parameter_defaults={ + "log_filter": "mz_storage::source::postgres=trace,info" + }, + default_replication_factor=1, + ), + # no_reset so the source created in one testdrive invocation survives into + # the next; c.down at the start of the workflow gives us a clean slate. + Testdrive(no_reset=True, default_timeout="120s"), + Postgres(name="pg-primary"), + Postgres( + name="pg-standby", + extra_command=["-c", "hot_standby_feedback=on"], + volumes=["standbydata:/var/lib/postgresql/data"], + ), +] + + +def _pg_connect(c: Composition, service: str) -> psycopg.Connection: + return psycopg.connect( + host="localhost", + user="postgres", + password="postgres", + port=c.default_port(service), + autocommit=True, + ) + + +def _allow_replication(c: Composition, service: str) -> None: + """Permit replication connections (basebackup, streaming, logical decoding). + + The baked pg_hba.conf has no `replication` entry, and `host all` does not + match replication connections in PostgreSQL. + """ + c.exec( + service, + "bash", + "-c", + "echo 'host replication all all trust' >> /share/conf/pg_hba.conf", + ) + c.exec(service, "psql", "-U", "postgres", "-c", "SELECT pg_reload_conf();") + + +def _wait_for_standby(c: Composition) -> None: + """Block until the standby is accepting connections and is in recovery.""" + last_err: Exception | None = None + for _ in range(60): + try: + conn = _pg_connect(c, "pg-standby") + in_recovery = conn.execute("SELECT pg_is_in_recovery()").fetchone()[0] + conn.close() + if in_recovery: + return + except Exception as e: + last_err = e + threading.Event().wait(1) + raise RuntimeError(f"standby did not reach recovery state: {last_err}") + + +@contextmanager +def nudge_standby_snapshots(c: Composition) -> Iterator[None]: + """Periodically emit a standby-snapshot record on the primary. + + Creating a logical slot on a standby blocks until a RUNNING_XACTS record is + replayed. Materialize creates slots synchronously (one persistent slot plus + a temporary per-worker snapshot slot), so we keep nudging for the whole + duration that the source is being set up and verified. + """ + stop = threading.Event() + + def run() -> None: + conn = _pg_connect(c, "pg-primary") + try: + while not stop.is_set(): + try: + conn.execute("SELECT pg_log_standby_snapshot()") + except Exception: + pass + stop.wait(1) + finally: + conn.close() + + thread = threading.Thread(target=run, daemon=True) + thread.start() + try: + yield + finally: + stop.set() + thread.join() + + +def setup_standby(c: Composition) -> None: + """Bring up the primary, seed it, then clone it into a physical standby.""" + c.down(destroy_volumes=True) + + # testdrive connects to materialized at startup, so it must be up before we + # run any testdrive files -- even ones that only touch Postgres. + c.up("materialized", "pg-primary") + _allow_replication(c, "pg-primary") + c.run_testdrive_files("configure-primary.td") + + # Clone the primary into the standby's data volume *before* postgres starts. + # `-R` writes standby.signal + primary_conninfo so it comes up in recovery. + # Runs as root to chown the fresh volume, then steps down to postgres since + # the server refuses a data dir it does not own. + c.run( + "pg-standby", + "-c", + dedent( + """ + set -e + mkdir -p "$PGDATA" + chown postgres:postgres "$PGDATA" + chmod 700 "$PGDATA" + rm -rf "$PGDATA"/* || true + gosu postgres pg_basebackup -h pg-primary -U postgres -D "$PGDATA" -Fp -Xs -R -P + echo basebackup-done + """ + ), + entrypoint="bash", + ) + + c.up("pg-standby") + _wait_for_standby(c) + # The standby reads its own pg_hba.conf; allow Materialize's replication + # connection (slot creation + START_REPLICATION) to it. + _allow_replication(c, "pg-standby") + + +def workflow_default(c: Composition) -> None: + setup_standby(c) + + with nudge_standby_snapshots(c): + # Source creation + initial snapshot both create slots on the standby, + # which block until a snapshot record arrives -- hence the nudger. + c.run_testdrive_files("configure-materialize.td", "verify-snapshot.td") + + # Changes made on the primary must flow primary -> standby (physical) -> + # Materialize (logical decoding on the standby). + c.run_testdrive_files("insert-update-delete.td", "verify-cdc.td") + + _verify_reading_from_standby(c) + + +def _verify_reading_from_standby(c: Composition) -> None: + """Prove Materialize really decoded from the standby, not the primary.""" + conn = _pg_connect(c, "pg-standby") + try: + in_recovery = conn.execute("SELECT pg_is_in_recovery()").fetchone()[0] + assert in_recovery, "expected pg-standby to be a replica in recovery" + + logical_slots = conn.execute( + "SELECT count(*) FROM pg_replication_slots WHERE slot_type = 'logical'" + ).fetchone()[0] + assert ( + logical_slots >= 1 + ), f"expected a logical slot on the standby, found {logical_slots}" + finally: + conn.close() + + # The primary must NOT carry Materialize's logical slot. + conn = _pg_connect(c, "pg-primary") + try: + in_recovery = conn.execute("SELECT pg_is_in_recovery()").fetchone()[0] + assert not in_recovery, "expected pg-primary to be the primary" + finally: + conn.close() diff --git a/test/pg-cdc-standby/probe.td b/test/pg-cdc-standby/probe.td new file mode 100644 index 0000000000000..bb58f2989a3fd --- /dev/null +++ b/test/pg-cdc-standby/probe.td @@ -0,0 +1,2 @@ +$ postgres-execute connection=postgres://postgres:postgres@pg-primary +SELECT 1; diff --git a/test/pg-cdc-standby/verify-cdc.td b/test/pg-cdc-standby/verify-cdc.td new file mode 100644 index 0000000000000..9c2e5461d0dd9 --- /dev/null +++ b/test/pg-cdc-standby/verify-cdc.td @@ -0,0 +1,23 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# 1000 initial + 500 inserted - 250 deleted = 1250 +> SELECT count(*) FROM t +1250 + +> SELECT v FROM t WHERE id = 1000 +updated + +> SELECT min(id), max(id) FROM t +251 1500 + +> SELECT * FROM small ORDER BY id +2 B +3 c +4 d diff --git a/test/pg-cdc-standby/verify-snapshot.td b/test/pg-cdc-standby/verify-snapshot.td new file mode 100644 index 0000000000000..8ea4c29097231 --- /dev/null +++ b/test/pg-cdc-standby/verify-snapshot.td @@ -0,0 +1,18 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# The initial snapshot, taken via logical decoding on the standby. + +> SELECT count(*) FROM t +1000 + +> SELECT * FROM small ORDER BY id +1 a +2 b +3 c