Skip to content
Merged
2 changes: 1 addition & 1 deletion .github/workflows/spockbench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ jobs:
echo "TAP_CT_NAME=$TAP_CT_NAME" >> "$GITHUB_ENV"
docker run --name "$TAP_CT_NAME" -e PGVER=${{ matrix.pgver }} --workdir=/home/pgedge/spock/tests/tap \
spock /home/pgedge/spock/tests/tap/run_tests.sh
timeout-minutes: 20
timeout-minutes: 30

- name: Collect TAP artifacts (from container)
if: ${{ always() }}
Expand Down
1 change: 1 addition & 0 deletions include/spock.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ extern bool allow_ddl_from_functions;
extern int restart_delay_default;
extern int restart_delay_on_exception;
extern int spock_replay_queue_size;
extern int spock_pause_timeout;
extern bool check_all_uc_indexes;
extern bool spock_enable_quiet_mode;
extern int log_origin_change;
Expand Down
1 change: 1 addition & 0 deletions include/spock_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,6 @@ extern TimestampTz apply_worker_get_prev_remote_ts(void);
extern void spock_group_resource_dump(void);
extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags);
extern void spock_group_progress_update_list(List *lst);
extern void spock_group_progress_force_set_list(List *lst);

#endif /* SPOCK_GROUP_H */
8 changes: 8 additions & 0 deletions include/spock_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef struct SpockApplyWorker
XLogRecPtr replay_stop_lsn; /* Replay should stop here if defined. */
bool sync_pending; /* Is there new synchronization info pending?. */
bool use_try_block; /* Should use try block for apply? */
bool paused; /* Worker is paused for slot creation. */
SpockGroupEntry *apply_group; /* Apply group to be used with parallel
* slots. */
} SpockApplyWorker;
Expand Down Expand Up @@ -106,6 +107,13 @@ typedef struct SpockContext
/* Manages access to SpockGroupHash */
LWLock *apply_group_master_lock;

/*
* Pause mechanism for apply workers during slot creation.
* Non-zero signals workers to sleep on pause_cv until cleared.
*/
pg_atomic_uint32 pause_apply;
ConditionVariable pause_cv;

/* Background workers. */
int total_workers;
SpockWorker workers[FLEXIBLE_ARRAY_MEMBER];
Expand Down
387 changes: 333 additions & 54 deletions samples/Z0DAN/zodan.sql

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions samples/Z0DAN/zodan_cleanup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ DROP PROCEDURE IF EXISTS spock.create_source_to_new_subscription(text, text, tex
DROP PROCEDURE IF EXISTS spock.trigger_sync_on_other_nodes_and_wait_on_source(text, text, text, text, boolean);
DROP PROCEDURE IF EXISTS spock.check_commit_timestamp_and_advance_slot(text, text, text, text, boolean);
DROP PROCEDURE IF EXISTS spock.present_final_cluster_state(text, integer, boolean);
DROP PROCEDURE IF EXISTS spock.add_node(text, text, text, text, boolean, text, text, jsonb, integer);
DROP PROCEDURE IF EXISTS spock.add_node(text, text, text, text, boolean, text, text, jsonb);

-- Drop temporary table if it exists
Expand Down
105 changes: 105 additions & 0 deletions sql/spock--5.0.6--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,111 @@ CREATE VIEW spock.progress AS
SELECT oid FROM pg_database WHERE datname = current_database()
);

CREATE FUNCTION spock.pause_apply_workers()
RETURNS void
AS 'MODULE_PATHNAME', 'spock_pause_apply_workers'
LANGUAGE C VOLATILE;

CREATE FUNCTION spock.resume_apply_workers()
RETURNS void
AS 'MODULE_PATHNAME', 'spock_resume_apply_workers'
LANGUAGE C VOLATILE;

-- Read peer progress (ros.remote_lsn) for all peer subscriptions.
-- Called while apply workers are paused and the slot's snapshot is imported.
-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer.
CREATE FUNCTION spock.read_peer_progress(
p_slot_name text,
p_provider_node_id oid,
p_subscriber_node_id oid
) RETURNS TABLE(
lsn pg_lsn,
snapshot text,
dbid oid,
node_id oid,
remote_node_id oid,
remote_commit_ts timestamptz,
prev_remote_ts timestamptz,
remote_commit_lsn pg_lsn,
remote_insert_lsn pg_lsn,
received_lsn pg_lsn,
last_updated_ts timestamptz,
updated_by_decode boolean
) VOLATILE STRICT LANGUAGE plpgsql AS $$
DECLARE
v_lsn pg_lsn;
v_snap text;
rec record;
v_n_peers int := 0;
BEGIN
/*
* The slot and snapshot are created by the C caller via the replication
* protocol. The slot's snapshot is imported into this transaction.
* This function just reads peer progress (ros.remote_lsn) while apply
* workers are paused.
*/

-- Get the slot's LSN and the imported snapshot for the header row.
SELECT restart_lsn INTO v_lsn
FROM pg_replication_slots WHERE slot_name = p_slot_name;
v_snap := ''; -- snapshot managed by C caller

RAISE NOTICE 'SPOCK cswp slot=% v_lsn=%', p_slot_name, v_lsn;

-- Header row: lsn + snapshot only.
lsn := v_lsn;
snapshot := v_snap;
RETURN NEXT;

/*
* Emit one progress row per peer. With apply workers paused,
* ros.remote_lsn is exact: it reflects only committed transactions
* whose effects are visible in the slot snapshot.
*/
FOR rec IN (
SELECT p.dbid, p.node_id, p.remote_node_id,
p.remote_commit_ts, p.prev_remote_ts,
p.remote_commit_lsn AS grp_remote_commit_lsn,
p.remote_insert_lsn,
p.received_lsn, p.last_updated_ts, p.updated_by_decode,
ros.remote_lsn AS ros_remote_lsn,
sub.sub_slot_name AS sub_slot_name
FROM spock.subscription sub
JOIN spock.progress p
ON p.remote_node_id = sub.sub_origin
AND p.node_id = sub.sub_target
JOIN pg_replication_origin o
ON o.roname = sub.sub_slot_name
LEFT JOIN pg_replication_origin_status ros
ON ros.local_id = o.roident
WHERE sub.sub_target = p_provider_node_id
AND sub.sub_origin <> p_subscriber_node_id
) LOOP
v_n_peers := v_n_peers + 1;

lsn := v_lsn;
snapshot := v_snap;
dbid := rec.dbid;
node_id := rec.node_id;
remote_node_id := rec.remote_node_id;
remote_commit_ts := rec.remote_commit_ts;
prev_remote_ts := rec.prev_remote_ts;
remote_commit_lsn := COALESCE(rec.ros_remote_lsn, '0/0'::pg_lsn);
remote_insert_lsn := rec.remote_insert_lsn;
received_lsn := rec.received_lsn;
last_updated_ts := rec.last_updated_ts;
updated_by_decode := rec.updated_by_decode;

RAISE NOTICE 'SPOCK cswp peer=% resume_lsn=%',
rec.remote_node_id, remote_commit_lsn;

RETURN NEXT;
END LOOP;

RAISE NOTICE 'SPOCK cswp slot=% done peers=%', p_slot_name, v_n_peers;
END;
$$;

CREATE VIEW spock.lag_tracker AS
SELECT
origin.node_name AS origin_name,
Expand Down
105 changes: 105 additions & 0 deletions sql/spock--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,101 @@ CREATE VIEW spock.progress AS
SELECT oid FROM pg_database WHERE datname = current_database()
);

-- Read peer progress (ros.remote_lsn) for all peer subscriptions.
-- Called while apply workers are paused and the slot's snapshot is imported.
-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer.
CREATE FUNCTION spock.read_peer_progress(
p_slot_name text,
p_provider_node_id oid,
p_subscriber_node_id oid
) RETURNS TABLE(
lsn pg_lsn,
snapshot text,
dbid oid,
node_id oid,
remote_node_id oid,
remote_commit_ts timestamptz,
prev_remote_ts timestamptz,
remote_commit_lsn pg_lsn,
remote_insert_lsn pg_lsn,
received_lsn pg_lsn,
last_updated_ts timestamptz,
updated_by_decode boolean
) VOLATILE STRICT LANGUAGE plpgsql AS $$
DECLARE
v_lsn pg_lsn;
v_snap text;
rec record;
v_n_peers int := 0;
BEGIN
/*
* The slot and snapshot are created by the C caller via the replication
* protocol. The slot's snapshot is imported into this transaction.
* This function just reads peer progress (ros.remote_lsn) while apply
* workers are paused.
*/

-- Get the slot's LSN and the imported snapshot for the header row.
SELECT restart_lsn INTO v_lsn
FROM pg_replication_slots WHERE slot_name = p_slot_name;
v_snap := ''; -- snapshot managed by C caller

RAISE NOTICE 'SPOCK cswp slot=% v_lsn=%', p_slot_name, v_lsn;

-- Header row: lsn only (snapshot managed by C caller).
lsn := v_lsn;
snapshot := v_snap;
RETURN NEXT;

/*
* Emit one progress row per peer. With apply workers paused,
* ros.remote_lsn is exact: it reflects only committed transactions
* whose effects are visible in the slot snapshot.
*/
FOR rec IN (
SELECT p.dbid, p.node_id, p.remote_node_id,
p.remote_commit_ts, p.prev_remote_ts,
p.remote_commit_lsn AS grp_remote_commit_lsn,
p.remote_insert_lsn,
p.received_lsn, p.last_updated_ts, p.updated_by_decode,
ros.remote_lsn AS ros_remote_lsn,
sub.sub_slot_name AS sub_slot_name
FROM spock.subscription sub
JOIN spock.progress p
ON p.remote_node_id = sub.sub_origin
AND p.node_id = sub.sub_target
JOIN pg_replication_origin o
ON o.roname = sub.sub_slot_name
LEFT JOIN pg_replication_origin_status ros
ON ros.local_id = o.roident
WHERE sub.sub_target = p_provider_node_id
AND sub.sub_origin <> p_subscriber_node_id
) LOOP
v_n_peers := v_n_peers + 1;

lsn := v_lsn;
snapshot := v_snap;
dbid := rec.dbid;
node_id := rec.node_id;
remote_node_id := rec.remote_node_id;
remote_commit_ts := rec.remote_commit_ts;
prev_remote_ts := rec.prev_remote_ts;
remote_commit_lsn := COALESCE(rec.ros_remote_lsn, '0/0'::pg_lsn);
remote_insert_lsn := rec.remote_insert_lsn;
received_lsn := rec.received_lsn;
last_updated_ts := rec.last_updated_ts;
updated_by_decode := rec.updated_by_decode;

RAISE NOTICE 'SPOCK cswp peer=% resume_lsn=%',
rec.remote_node_id, remote_commit_lsn;

RETURN NEXT;
END LOOP;

RAISE NOTICE 'SPOCK cswp slot=% done peers=%', p_slot_name, v_n_peers;
END;
$$;

CREATE FUNCTION spock.node_create(node_name name, dsn text,
location text DEFAULT NULL, country text DEFAULT NULL,
info jsonb DEFAULT NULL)
Expand Down Expand Up @@ -442,6 +537,16 @@ RETURNS pg_lsn RETURNS NULL ON NULL INPUT
AS 'MODULE_PATHNAME', 'spock_create_sync_event'
LANGUAGE C VOLATILE;

CREATE FUNCTION spock.pause_apply_workers()
RETURNS void
AS 'MODULE_PATHNAME', 'spock_pause_apply_workers'
LANGUAGE C VOLATILE;

CREATE FUNCTION spock.resume_apply_workers()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pause_apply_workers() and resume_apply_workers() must be restricted to superusers. Any unprivileged user can call pause and then never call resume, leaving apply workers blocked indefinitely. So either add a superuser check in the C function or restrict EXECUTE in the SQL definition.

RETURNS void
AS 'MODULE_PATHNAME', 'spock_resume_apply_workers'
LANGUAGE C VOLATILE;

CREATE PROCEDURE spock.wait_for_sync_event(
OUT result bool,
origin_id oid,
Expand Down
16 changes: 16 additions & 0 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ bool allow_ddl_from_functions = false;
int restart_delay_default;
int restart_delay_on_exception;
int spock_replay_queue_size;
int spock_pause_timeout = 10; /* seconds to wait for apply workers to pause */
bool check_all_uc_indexes = false;
bool spock_enable_quiet_mode = false;
int log_origin_change = SPOCK_ORIGIN_NONE;
Expand Down Expand Up @@ -1153,6 +1154,21 @@ _PG_init(void)
NULL,
NULL);

DefineCustomIntVariable("spock.pause_timeout",
"Timeout in seconds for pausing apply workers during slot creation",
"Controls how long add_node waits for apply "
"workers to reach a between-transaction pause point. Increase "
"if add_node fails with a pause timeout under heavy load.",
&spock_pause_timeout,
10,
1,
300,
PGC_USERSET,
GUC_UNIT_S,
NULL,
NULL,
NULL);

DefineCustomIntVariable("spock.exception_replay_queue_size",
"Maximum in-memory size for the apply replay queue",
"When the replay queue exceeds this size, subsequent "
Expand Down
40 changes: 38 additions & 2 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,27 @@ begin_replication_step(void)

if (!IsTransactionState())
{
/*
* Check if slot creation (add_node) needs us to pause. This only
* fires during add_node (a rare operation). The fast path is a
* single atomic read that almost always sees 0.
*
* Runs before StartTransactionCommand so the worker has no xid
* while paused — pause_apply_workers can detect completion via
* xid polling. The previous transaction's commit is fully
* complete, so ros.remote_lsn reflects only committed state.
*/
if (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
{
MyApplyWorker->paused = true;
ConditionVariablePrepareToSleep(&SpockCtx->pause_cv);
while (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
ConditionVariableSleep(&SpockCtx->pause_cv,
WAIT_EVENT_LOGICAL_APPLY_MAIN);
ConditionVariableCancelSleep();
MyApplyWorker->paused = false;
}

StartTransactionCommand();
spock_apply_heap_begin();

Expand Down Expand Up @@ -960,8 +981,7 @@ handle_commit(StringInfo s)
.key.remote_node_id = MySubscription->origin->id,
.remote_commit_ts = commit_time,
.prev_remote_ts = replorigin_session_origin_timestamp,
.remote_commit_lsn = commit_lsn,
/* Ensure invariant: received_lsn >= remote_commit_lsn */
.remote_commit_lsn = end_lsn,
.received_lsn = end_lsn,
/*
* Include remote_insert_lsn for WAL persistence. This was already
Expand Down Expand Up @@ -991,6 +1011,22 @@ handle_commit(StringInfo s)

in_remote_transaction = false;

/*
* If slot creation (add_node) is waiting for us, pause here. The
* commit is fully complete (ros.remote_lsn updated, xid cleared),
* so the snapshot and origin will be consistent.
*/
if (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
{
MyApplyWorker->paused = true;
ConditionVariablePrepareToSleep(&SpockCtx->pause_cv);
while (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
ConditionVariableSleep(&SpockCtx->pause_cv,
WAIT_EVENT_LOGICAL_APPLY_MAIN);
ConditionVariableCancelSleep();
MyApplyWorker->paused = false;
}

/*
* Stop replay if we're doing limited replay and we've replayed up to the
* last record we're supposed to process.
Expand Down
Loading
Loading