Skip to content

Commit ed54569

Browse files
committed
Address review feedback: add pause timeout recovery and restrict privileges
Add best-effort resume_apply_workers call in PG_CATCH error path so workers are unblocked if slot creation fails. Guard with PQstatus check to avoid hanging on a broken connection. Add ConditionVariableTimedSleep to both pause check points so workers self-recover after spock.pause_timeout seconds if resume is never called (e.g., caller crashed or connection lost). Extract maybe_pause_for_slot_creation() helper to deduplicate the pause logic in begin_replication_step and handle_commit. Revoke EXECUTE on pause_apply_workers and resume_apply_workers from PUBLIC to prevent unprivileged users from blocking apply workers.
1 parent 8a4131f commit ed54569

File tree

4 files changed

+50
-25
lines changed

4 files changed

+50
-25
lines changed

sql/spock--5.0.6--6.0.0-devel.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ RETURNS void
3535
AS 'MODULE_PATHNAME', 'spock_pause_apply_workers'
3636
LANGUAGE C VOLATILE;
3737

38+
REVOKE ALL ON FUNCTION spock.pause_apply_workers() FROM PUBLIC;
39+
3840
CREATE FUNCTION spock.resume_apply_workers()
3941
RETURNS void
4042
AS 'MODULE_PATHNAME', 'spock_resume_apply_workers'
4143
LANGUAGE C VOLATILE;
4244

45+
REVOKE ALL ON FUNCTION spock.resume_apply_workers() FROM PUBLIC;
46+
4347
-- Read peer progress (ros.remote_lsn) for all peer subscriptions.
4448
-- Called while apply workers are paused and the slot's snapshot is imported.
4549
-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer.

sql/spock--6.0.0-devel.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,11 +542,15 @@ RETURNS void
542542
AS 'MODULE_PATHNAME', 'spock_pause_apply_workers'
543543
LANGUAGE C VOLATILE;
544544

545+
REVOKE ALL ON FUNCTION spock.pause_apply_workers() FROM PUBLIC;
546+
545547
CREATE FUNCTION spock.resume_apply_workers()
546548
RETURNS void
547549
AS 'MODULE_PATHNAME', 'spock_resume_apply_workers'
548550
LANGUAGE C VOLATILE;
549551

552+
REVOKE ALL ON FUNCTION spock.resume_apply_workers() FROM PUBLIC;
553+
550554
CREATE PROCEDURE spock.wait_for_sync_event(
551555
OUT result bool,
552556
origin_id oid,

src/spock_apply.c

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,35 @@ action_error_callback(void *arg)
438438
* existing transaction).
439439
* Also provide a global snapshot and ensure we run in ApplyMessageContext.
440440
*/
441+
442+
/*
443+
* If the pause flag is set (slot creation in progress for add_node),
444+
* sleep on the ConditionVariable until resumed or timed out.
445+
*/
446+
static void
447+
maybe_pause_for_slot_creation(void)
448+
{
449+
if (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
450+
{
451+
MyApplyWorker->paused = true;
452+
ConditionVariablePrepareToSleep(&SpockCtx->pause_cv);
453+
while (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
454+
{
455+
if (ConditionVariableTimedSleep(&SpockCtx->pause_cv,
456+
spock_pause_timeout * 1000L,
457+
WAIT_EVENT_LOGICAL_APPLY_MAIN))
458+
{
459+
elog(WARNING, "SPOCK: apply worker pause timed out after %ds, resuming",
460+
spock_pause_timeout);
461+
pg_atomic_write_u32(&SpockCtx->pause_apply, 0);
462+
break;
463+
}
464+
}
465+
ConditionVariableCancelSleep();
466+
MyApplyWorker->paused = false;
467+
}
468+
}
469+
441470
static bool
442471
begin_replication_step(void)
443472
{
@@ -464,16 +493,7 @@ begin_replication_step(void)
464493
* xid polling. The previous transaction's commit is fully
465494
* complete, so ros.remote_lsn reflects only committed state.
466495
*/
467-
if (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
468-
{
469-
MyApplyWorker->paused = true;
470-
ConditionVariablePrepareToSleep(&SpockCtx->pause_cv);
471-
while (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
472-
ConditionVariableSleep(&SpockCtx->pause_cv,
473-
WAIT_EVENT_LOGICAL_APPLY_MAIN);
474-
ConditionVariableCancelSleep();
475-
MyApplyWorker->paused = false;
476-
}
496+
maybe_pause_for_slot_creation();
477497

478498
StartTransactionCommand();
479499
spock_apply_heap_begin();
@@ -1011,21 +1031,7 @@ handle_commit(StringInfo s)
10111031

10121032
in_remote_transaction = false;
10131033

1014-
/*
1015-
* If slot creation (add_node) is waiting for us, pause here. The
1016-
* commit is fully complete (ros.remote_lsn updated, xid cleared),
1017-
* so the snapshot and origin will be consistent.
1018-
*/
1019-
if (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
1020-
{
1021-
MyApplyWorker->paused = true;
1022-
ConditionVariablePrepareToSleep(&SpockCtx->pause_cv);
1023-
while (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0)
1024-
ConditionVariableSleep(&SpockCtx->pause_cv,
1025-
WAIT_EVENT_LOGICAL_APPLY_MAIN);
1026-
ConditionVariableCancelSleep();
1027-
MyApplyWorker->paused = false;
1028-
}
1034+
maybe_pause_for_slot_creation();
10291035

10301036
/*
10311037
* Stop replay if we're doing limited replay and we've replayed up to the

src/spock_sync.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,6 +1472,17 @@ spock_sync_subscription(SpockSubscription *sub)
14721472
sub->name, sub->slot_name,
14731473
edata->message ? edata->message : "");
14741474

1475+
/* Best-effort resume of apply workers on the remote node.
1476+
* If the connection is broken this will fail silently —
1477+
* the workers' CV timeout will recover them. */
1478+
if (origin_conn && PQstatus(origin_conn) == CONNECTION_OK)
1479+
{
1480+
PGresult *rres = PQexec(origin_conn,
1481+
"SELECT spock.resume_apply_workers()");
1482+
if (rres)
1483+
PQclear(rres);
1484+
}
1485+
14751486
FreeErrorData(edata);
14761487
PG_RE_THROW();
14771488
}

0 commit comments

Comments
 (0)