Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,17 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-cdc-standby
label: Postgres CDC against a physical standby
depends_on: build-aarch64
timeout_in_minutes: 30
inputs: [test/pg-cdc-standby]
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-standby
agents:
queue: hetzner-aarch64-8cpu-16gb

- group: Connection
key: connection-tests
steps:
Expand Down
12 changes: 11 additions & 1 deletion src/postgres-util/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,17 @@ pub async fn get_timeline_id(client: &Client) -> Result<u64, PostgresError> {
}

pub async fn get_current_wal_lsn(client: &Client) -> Result<PgLsn, PostgresError> {
let row = client.query_one("SELECT pg_current_wal_lsn()", &[]).await?;
// `pg_current_wal_lsn()` errors with "recovery is in progress" on a physical
// standby. When reading from a standby (logical decoding on standby, PG 16+)
// the relevant frontier is how far WAL has been replayed locally, which is
// exactly what `pg_last_wal_replay_lsn()` reports.
let row = client
.query_one(
"SELECT CASE WHEN pg_is_in_recovery() \
THEN pg_last_wal_replay_lsn() ELSE pg_current_wal_lsn() END",
&[],
)
.await?;
let lsn: PgLsn = row.get(0);

Ok(lsn)
Expand Down
15 changes: 8 additions & 7 deletions src/storage/src/source/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,22 +447,23 @@ async fn fetch_slot_metadata(
}
}

/// Fetch the `pg_current_wal_lsn`, used to report metrics.
/// Fetch the upstream WAL frontier, used to report metrics.
async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
let query = "SELECT pg_current_wal_lsn()";
// `pg_current_wal_lsn()` errors with "recovery is in progress" on a physical
// standby; fall back to the local replay frontier there. See
// `mz_postgres_util::get_current_wal_lsn` for the rationale.
let query = "SELECT CASE WHEN pg_is_in_recovery() \
THEN pg_last_wal_replay_lsn() ELSE pg_current_wal_lsn() END AS wal_lsn";
let row = simple_query_opt(client, query).await?;

match row.and_then(|row| {
row.get("pg_current_wal_lsn")
.map(|lsn| lsn.parse::<PgLsn>().unwrap())
}) {
match row.and_then(|row| row.get("wal_lsn").map(|lsn| lsn.parse::<PgLsn>().unwrap())) {
// Based on the documentation, it appears that `pg_current_wal_lsn` has
// the same "upper" semantics of `confirmed_flush_lsn`:
// <https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADMIN-BACKUP>
// We may need to revisit this and use `pg_current_wal_flush_lsn`.
Some(lsn) => Ok(MzOffset::from(lsn)),
None => Err(TransientError::Generic(anyhow::anyhow!(
"pg_current_wal_lsn() mysteriously has no value"
"WAL LSN mysteriously has no value"
))),
}
}
Expand Down
31 changes: 31 additions & 0 deletions test/pg-cdc-standby/configure-materialize.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Point Materialize at the *standby*, not the primary. Slot creation and the
# initial snapshot both create logical slots on the standby, which block until
# the primary emits a standby-snapshot record -- the mzcompose workflow nudges
# the primary in the background while this runs.

> DROP SECRET IF EXISTS pgpass CASCADE;
> DROP CONNECTION IF EXISTS pg CASCADE;

> CREATE SECRET pgpass AS 'postgres'

> CREATE CONNECTION pg TO POSTGRES (
HOST 'pg-standby',
DATABASE postgres,
USER postgres,
PASSWORD SECRET pgpass
)

> CREATE SOURCE mz_source
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')

> CREATE TABLE t FROM SOURCE mz_source (REFERENCE t)
> CREATE TABLE small FROM SOURCE mz_source (REFERENCE small)
27 changes: 27 additions & 0 deletions test/pg-cdc-standby/configure-primary.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Seed the primary. The standby is cloned from it afterwards, so these objects
# (and the postgres role's REPLICATION attribute, which it has by default)
# arrive on the standby via the physical base backup.

$ postgres-execute connection=postgres://postgres:postgres@pg-primary
DROP TABLE IF EXISTS t;
CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT);
ALTER TABLE t REPLICA IDENTITY FULL;
INSERT INTO t SELECT g, 'v' || g FROM generate_series(1, 1000) AS g;

DROP TABLE IF EXISTS small;
CREATE TABLE small (id INTEGER PRIMARY KEY, v TEXT);
ALTER TABLE small REPLICA IDENTITY FULL;
INSERT INTO small VALUES (1, 'a'), (2, 'b'), (3, 'c');

DROP PUBLICATION IF EXISTS mz_source;
CREATE PUBLICATION mz_source FOR ALL TABLES;
ANALYZE;
20 changes: 20 additions & 0 deletions test/pg-cdc-standby/insert-update-delete.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# All DML happens on the PRIMARY. It must reach Materialize via:
# primary --(physical streaming replication)--> standby --(logical decoding)--> mz

$ postgres-execute connection=postgres://postgres:postgres@pg-primary
INSERT INTO t SELECT g, 'v' || g FROM generate_series(1001, 1500) AS g;
DELETE FROM t WHERE id <= 250;
UPDATE t SET v = 'updated' WHERE id = 1000;

INSERT INTO small VALUES (4, 'd');
DELETE FROM small WHERE id = 1;
UPDATE small SET v = 'B' WHERE id = 2;
14 changes: 14 additions & 0 deletions test/pg-cdc-standby/mzcompose
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
#
# mzcompose — runs Docker Compose with Materialize customizations.

exec "$(dirname "$0")"/../../bin/pyactivate -m materialize.cli.mzcompose "$@"
211 changes: 211 additions & 0 deletions test/pg-cdc-standby/mzcompose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""
Postgres source against a physical *standby*.

Exercises "logical decoding on standby" (PostgreSQL 16+): a primary streams its
WAL to a physical replica via streaming replication, and Materialize creates a
logical replication slot on the *replica* and decodes changes there. Data
originates on the primary, flows to the standby physically, and is decoded
logically for Materialize -- the primary is never touched by Materialize.

The notable wrinkle is that creating a logical slot on a standby blocks until
the primary emits a standby-snapshot (RUNNING_XACTS) WAL record. Materialize
creates its slots synchronously, so without help the source would hang. We run
a background thread that calls pg_log_standby_snapshot() on the primary to
unblock slot creation; see nudge_standby_snapshots().
"""

import threading
from contextlib import contextmanager
from textwrap import dedent
from typing import Iterator

import psycopg

from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.postgres import Postgres
from materialize.mzcompose.services.testdrive import Testdrive

# Standby's data directory is seeded by pg_basebackup before postgres starts, so
# it needs to survive between the `run` (basebackup) and `up` (serve) steps.
VOLUMES = {"standbydata": None}

SERVICES = [
Mz(app_password=""),
Materialized(
additional_system_parameter_defaults={
"log_filter": "mz_storage::source::postgres=trace,info"
},
default_replication_factor=1,
),
# no_reset so the source created in one testdrive invocation survives into
# the next; c.down at the start of the workflow gives us a clean slate.
Testdrive(no_reset=True, default_timeout="120s"),
Postgres(name="pg-primary"),
Postgres(
name="pg-standby",
extra_command=["-c", "hot_standby_feedback=on"],
volumes=["standbydata:/var/lib/postgresql/data"],
),
]


def _pg_connect(c: Composition, service: str) -> psycopg.Connection:
return psycopg.connect(
host="localhost",
user="postgres",
password="postgres",
port=c.default_port(service),
autocommit=True,
)


def _allow_replication(c: Composition, service: str) -> None:
"""Permit replication connections (basebackup, streaming, logical decoding).

The baked pg_hba.conf has no `replication` entry, and `host all` does not
match replication connections in PostgreSQL.
"""
c.exec(
service,
"bash",
"-c",
"echo 'host replication all all trust' >> /share/conf/pg_hba.conf",
)
c.exec(service, "psql", "-U", "postgres", "-c", "SELECT pg_reload_conf();")


def _wait_for_standby(c: Composition) -> None:
"""Block until the standby is accepting connections and is in recovery."""
last_err: Exception | None = None
for _ in range(60):
try:
conn = _pg_connect(c, "pg-standby")
in_recovery = conn.execute("SELECT pg_is_in_recovery()").fetchone()[0]
conn.close()
if in_recovery:
return
except Exception as e:
last_err = e
threading.Event().wait(1)
raise RuntimeError(f"standby did not reach recovery state: {last_err}")


@contextmanager
def nudge_standby_snapshots(c: Composition) -> Iterator[None]:
"""Periodically emit a standby-snapshot record on the primary.

Creating a logical slot on a standby blocks until a RUNNING_XACTS record is
replayed. Materialize creates slots synchronously (one persistent slot plus
a temporary per-worker snapshot slot), so we keep nudging for the whole
duration that the source is being set up and verified.
"""
stop = threading.Event()

def run() -> None:
conn = _pg_connect(c, "pg-primary")
try:
while not stop.is_set():
try:
conn.execute("SELECT pg_log_standby_snapshot()")
except Exception:
pass
stop.wait(1)
finally:
conn.close()

thread = threading.Thread(target=run, daemon=True)
thread.start()
try:
yield
finally:
stop.set()
thread.join()


def setup_standby(c: Composition) -> None:
"""Bring up the primary, seed it, then clone it into a physical standby."""
c.down(destroy_volumes=True)

# testdrive connects to materialized at startup, so it must be up before we
# run any testdrive files -- even ones that only touch Postgres.
c.up("materialized", "pg-primary")
_allow_replication(c, "pg-primary")
c.run_testdrive_files("configure-primary.td")

# Clone the primary into the standby's data volume *before* postgres starts.
# `-R` writes standby.signal + primary_conninfo so it comes up in recovery.
# Runs as root to chown the fresh volume, then steps down to postgres since
# the server refuses a data dir it does not own.
c.run(
"pg-standby",
"-c",
dedent(
"""
set -e
mkdir -p "$PGDATA"
chown postgres:postgres "$PGDATA"
chmod 700 "$PGDATA"
rm -rf "$PGDATA"/* || true
gosu postgres pg_basebackup -h pg-primary -U postgres -D "$PGDATA" -Fp -Xs -R -P
echo basebackup-done
"""
),
entrypoint="bash",
)

c.up("pg-standby")
_wait_for_standby(c)
# The standby reads its own pg_hba.conf; allow Materialize's replication
# connection (slot creation + START_REPLICATION) to it.
_allow_replication(c, "pg-standby")


def workflow_default(c: Composition) -> None:
setup_standby(c)

with nudge_standby_snapshots(c):
# Source creation + initial snapshot both create slots on the standby,
# which block until a snapshot record arrives -- hence the nudger.
c.run_testdrive_files("configure-materialize.td", "verify-snapshot.td")

# Changes made on the primary must flow primary -> standby (physical) ->
# Materialize (logical decoding on the standby).
c.run_testdrive_files("insert-update-delete.td", "verify-cdc.td")

_verify_reading_from_standby(c)


def _verify_reading_from_standby(c: Composition) -> None:
"""Prove Materialize really decoded from the standby, not the primary."""
conn = _pg_connect(c, "pg-standby")
try:
in_recovery = conn.execute("SELECT pg_is_in_recovery()").fetchone()[0]
assert in_recovery, "expected pg-standby to be a replica in recovery"

logical_slots = conn.execute(
"SELECT count(*) FROM pg_replication_slots WHERE slot_type = 'logical'"
).fetchone()[0]
assert (
logical_slots >= 1
), f"expected a logical slot on the standby, found {logical_slots}"
finally:
conn.close()

# The primary must NOT carry Materialize's logical slot.
conn = _pg_connect(c, "pg-primary")
try:
in_recovery = conn.execute("SELECT pg_is_in_recovery()").fetchone()[0]
assert not in_recovery, "expected pg-primary to be the primary"
finally:
conn.close()
2 changes: 2 additions & 0 deletions test/pg-cdc-standby/probe.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
$ postgres-execute connection=postgres://postgres:postgres@pg-primary
SELECT 1;
Loading
Loading