Skip to content

Commit 9f619be

Browse files
committed
Rename create_slot_with_progress to read_peer_progress and fix stale comments
The SQL function no longer creates a slot (that moved to the C caller via replication protocol). Rename to reflect its actual purpose. Also fix stale comments referencing advisory locks (removed earlier) in pause/resume worker functions.
1 parent 46f6d74 commit 9f619be

File tree

7 files changed

+24
-22
lines changed

7 files changed

+24
-22
lines changed

sql/spock--5.0.6--6.0.0-devel.sql

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ RETURNS void
4040
AS 'MODULE_PATHNAME', 'spock_resume_apply_workers'
4141
LANGUAGE C VOLATILE;
4242

43-
-- Atomically create a replication slot and read spock.progress for all peers.
44-
-- Row 0: lsn + snapshot header. Rows 1+: one progress entry per peer.
45-
CREATE FUNCTION spock.create_slot_with_progress(
43+
-- Read peer progress (ros.remote_lsn) for all peer subscriptions.
44+
-- Called while apply workers are paused and the slot's snapshot is imported.
45+
-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer.
46+
CREATE FUNCTION spock.read_peer_progress(
4647
p_slot_name text,
4748
p_provider_node_id oid,
4849
p_subscriber_node_id oid

sql/spock--6.0.0-devel.sql

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,10 @@ CREATE VIEW spock.progress AS
121121
SELECT oid FROM pg_database WHERE datname = current_database()
122122
);
123123

124-
-- Atomically create a replication slot and read spock.progress for all peers.
125-
-- Row 0: lsn + snapshot header. Rows 1+: one progress entry per peer.
126-
CREATE FUNCTION spock.create_slot_with_progress(
124+
-- Read peer progress (ros.remote_lsn) for all peer subscriptions.
125+
-- Called while apply workers are paused and the slot's snapshot is imported.
126+
-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer.
127+
CREATE FUNCTION spock.read_peer_progress(
127128
p_slot_name text,
128129
p_provider_node_id oid,
129130
p_subscriber_node_id oid

src/spock.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1156,7 +1156,7 @@ _PG_init(void)
11561156

11571157
DefineCustomIntVariable("spock.pause_timeout",
11581158
"Timeout in seconds for pausing apply workers during slot creation",
1159-
"Controls how long create_slot_with_progress waits for apply "
1159+
"Controls how long add_node waits for apply "
11601160
"workers to reach a between-transaction pause point. Increase "
11611161
"if add_node fails with a pause timeout under heavy load.",
11621162
&spock_pause_timeout,

src/spock_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ begin_replication_step(void)
432432
if (!IsTransactionState())
433433
{
434434
/*
435-
* Check if create_slot_with_progress needs us to pause. This only
435+
* Check if slot creation (add_node) needs us to pause. This only
436436
* fires during add_node (a rare operation). The fast path is a
437437
* single atomic read that almost always sees 0.
438438
*
@@ -989,7 +989,7 @@ handle_commit(StringInfo s)
989989
in_remote_transaction = false;
990990

991991
/*
992-
* If create_slot_with_progress is waiting for us, pause here. The
992+
* If slot creation (add_node) is waiting for us, pause here. The
993993
* commit is fully complete (ros.remote_lsn updated, xid cleared),
994994
* so the snapshot and origin will be consistent.
995995
*/

src/spock_functions.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3322,12 +3322,12 @@ spock_create_sync_event(PG_FUNCTION_ARGS)
33223322
/*
33233323
* spock_pause_apply_workers
33243324
*
3325-
* Pause all apply workers in this database by acquiring an exclusive advisory
3326-
* lock and setting the shared memory flag. Apply workers check the flag
3327-
* between transactions and block on a shared advisory lock until we release.
3325+
* Temporarily pause all apply workers in this database during slot creation
3326+
* for add_node. Sets a shared memory flag that workers check between
3327+
* transactions; workers sleep on a ConditionVariable until resumed.
33283328
*
33293329
* After this function returns, all apply workers have finished their current
3330-
* transaction and are blocked, so pg_replication_origin_status reflects only
3330+
* transaction and are paused, so pg_replication_origin_status reflects only
33313331
* committed state that is visible in any new snapshot.
33323332
*/
33333333
Datum
@@ -3402,8 +3402,8 @@ spock_pause_apply_workers(PG_FUNCTION_ARGS)
34023402
/*
34033403
* spock_resume_apply_workers
34043404
*
3405-
* Resume apply workers by clearing the flag and releasing the exclusive
3406-
* advisory lock.
3405+
* Resume apply workers by clearing the flag and broadcasting on the
3406+
* ConditionVariable to wake all sleeping workers.
34073407
*/
34083408
Datum
34093409
spock_resume_apply_workers(PG_FUNCTION_ARGS)

src/spock_group.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ spock_group_progress_force_set_list(List *lst)
737737
{
738738
/*
739739
* Existing LSN >= resume_lsn. Unconditionally overwrite: the
740-
* value from create_slot_with_progress is authoritative because
740+
* value from read_peer_progress is authoritative because
741741
* it was captured at COPY snapshot time. The apply worker may
742742
* have advanced past it since then, but any data it applied
743743
* after the snapshot is NOT in the COPY — so the new node must

src/spock_sync.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ adjust_progress_info(PGconn *origin_conn)
557557
}
558558

559559
/*
560-
* spock_create_slot_and_get_progress
560+
* spock_create_slot_and_read_progress
561561
*
562562
* Pauses apply workers, creates a replication slot via the replication
563563
* protocol (which returns a snapshot consistent with the slot's WAL
@@ -568,7 +568,7 @@ adjust_progress_info(PGconn *origin_conn)
568568
* after the COPY phase completes.
569569
*/
570570
static char *
571-
spock_create_slot_and_get_progress(PGconn *conn, PGconn *repl_conn,
571+
spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn,
572572
const char *slot_name,
573573
Oid origin_node_id, Oid subscriber_node_id,
574574
XLogRecPtr *lsn_out, List **progress_out)
@@ -670,7 +670,7 @@ spock_create_slot_and_get_progress(PGconn *conn, PGconn *repl_conn,
670670

671671
/* Read peer progress via the SQL function (slot already exists) */
672672
appendStringInfo(&query,
673-
"SELECT * FROM spock.create_slot_with_progress"
673+
"SELECT * FROM spock.read_peer_progress"
674674
"('%s', %u, %u)",
675675
slot_name, origin_node_id, subscriber_node_id);
676676
res = PQexec(conn, query.data);
@@ -682,7 +682,7 @@ spock_create_slot_and_get_progress(PGconn *conn, PGconn *repl_conn,
682682

683683
nrows = PQntuples(res);
684684
if (nrows < 1)
685-
elog(ERROR, "spock.create_slot_with_progress returned no rows");
685+
elog(ERROR, "spock.read_peer_progress returned no rows");
686686

687687
/* Row 0 is the header row: lsn + snapshot, progress fields all NULL.
688688
* lsn_out and snapshot are already set from the replication protocol;
@@ -771,7 +771,7 @@ spock_create_slot_and_get_progress(PGconn *conn, PGconn *repl_conn,
771771
* spock_release_slot_snapshot
772772
*
773773
* Rolls back the snapshot transaction opened by
774-
* spock_create_slot_and_get_progress and closes the connection.
774+
* spock_create_slot_and_read_progress and closes the connection.
775775
*/
776776
static void
777777
spock_release_slot_snapshot(PGconn *conn)
@@ -1454,7 +1454,7 @@ spock_sync_subscription(SpockSubscription *sub)
14541454
*/
14551455
PG_TRY();
14561456
{
1457-
snapshot = spock_create_slot_and_get_progress(
1457+
snapshot = spock_create_slot_and_read_progress(
14581458
origin_conn,
14591459
origin_conn_repl,
14601460
sub->slot_name,

0 commit comments

Comments
 (0)