diff --git a/.github/workflows/spockbench.yml b/.github/workflows/spockbench.yml index 83800dd2..11ba44b7 100644 --- a/.github/workflows/spockbench.yml +++ b/.github/workflows/spockbench.yml @@ -157,7 +157,7 @@ jobs: echo "TAP_CT_NAME=$TAP_CT_NAME" >> "$GITHUB_ENV" docker run --name "$TAP_CT_NAME" -e PGVER=${{ matrix.pgver }} --workdir=/home/pgedge/spock/tests/tap \ spock /home/pgedge/spock/tests/tap/run_tests.sh - timeout-minutes: 20 + timeout-minutes: 30 - name: Collect TAP artifacts (from container) if: ${{ always() }} diff --git a/include/spock.h b/include/spock.h index 8b955d11..c73edd5b 100644 --- a/include/spock.h +++ b/include/spock.h @@ -51,6 +51,7 @@ extern bool allow_ddl_from_functions; extern int restart_delay_default; extern int restart_delay_on_exception; extern int spock_replay_queue_size; +extern int spock_pause_timeout; extern bool check_all_uc_indexes; extern bool spock_enable_quiet_mode; extern int log_origin_change; diff --git a/include/spock_group.h b/include/spock_group.h index 655fc1e2..256d8e04 100644 --- a/include/spock_group.h +++ b/include/spock_group.h @@ -161,5 +161,6 @@ extern TimestampTz apply_worker_get_prev_remote_ts(void); extern void spock_group_resource_dump(void); extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags); extern void spock_group_progress_update_list(List *lst); +extern void spock_group_progress_force_set_list(List *lst); #endif /* SPOCK_GROUP_H */ diff --git a/include/spock_worker.h b/include/spock_worker.h index f3a17d8d..60891451 100644 --- a/include/spock_worker.h +++ b/include/spock_worker.h @@ -44,6 +44,7 @@ typedef struct SpockApplyWorker XLogRecPtr replay_stop_lsn; /* Replay should stop here if defined. */ bool sync_pending; /* Is there new synchronization info pending?. */ bool use_try_block; /* Should use try block for apply? */ + bool paused; /* Worker is paused for slot creation. */ SpockGroupEntry *apply_group; /* Apply group to be used with parallel * slots. */ } SpockApplyWorker; @@ -106,6 +107,13 @@ typedef struct SpockContext /* Manages access to SpockGroupHash */ LWLock *apply_group_master_lock; + /* + * Pause mechanism for apply workers during slot creation. + * Non-zero signals workers to sleep on pause_cv until cleared. + */ + pg_atomic_uint32 pause_apply; + ConditionVariable pause_cv; + /* Background workers. */ int total_workers; SpockWorker workers[FLEXIBLE_ARRAY_MEMBER]; diff --git a/samples/Z0DAN/zodan.sql b/samples/Z0DAN/zodan.sql index aac331ea..279849bd 100644 --- a/samples/Z0DAN/zodan.sql +++ b/samples/Z0DAN/zodan.sql @@ -1294,23 +1294,6 @@ BEGIN FOR rec IN SELECT * FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name LOOP - -- Trigger sync event on origin node and store LSN - BEGIN - RAISE NOTICE ' - 3+ node scenario: sync event stored, skipping disabled subscriptions'; - SELECT * INTO remotesql - FROM dblink(rec.dsn, 'SELECT spock.sync_event()') AS t(sync_lsn text); - - -- Store the sync LSN for later use when enabling subscriptions - INSERT INTO temp_sync_lsns (origin_node, sync_lsn) - VALUES (rec.node_name, remotesql) - ON CONFLICT (origin_node) DO UPDATE SET sync_lsn = EXCLUDED.sync_lsn; - - RAISE NOTICE ' OK: %', rpad('Triggering sync event on node ' || rec.node_name || ' (LSN: ' || remotesql || ')', 120, ' '); - EXCEPTION - WHEN OTHERS THEN - RAISE EXCEPTION ' ✗ %', rpad('Triggering sync event on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); - END; - -- Create replication slot on the "other" node BEGIN -- Extract dbname and handle both quoted and unquoted values @@ -1338,17 +1321,124 @@ BEGIN SELECT lsn INTO _commit_lsn FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn); - UPDATE temp_sync_lsns SET commit_lsn = _commit_lsn - WHERE origin_node = rec.node_name; RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' (LSN: ' || _commit_lsn || ')' || ' on node ' || rec.node_name, 120, ' '); EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION ' ✗ %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); END; + -- Trigger sync event on origin node AFTER slot creation. The sync + -- event LSN is guaranteed > slot LSN because it is written to WAL + -- after the slot-creation commit. We use this LSN as the wait target + -- below so that pg_replication_origin_status (updated immediately by + -- the non-transactional message) can satisfy the check. + BEGIN + SELECT * INTO remotesql + FROM dblink(rec.dsn, 'SELECT spock.sync_event()') AS t(sync_lsn text); + + -- Store sync LSN and slot LSN (commit_lsn) for later phases. + -- _commit_lsn still holds the slot creation LSN from above. + INSERT INTO temp_sync_lsns (origin_node, sync_lsn, commit_lsn) + VALUES (rec.node_name, remotesql, _commit_lsn) + ON CONFLICT (origin_node) DO UPDATE + SET sync_lsn = EXCLUDED.sync_lsn, + commit_lsn = EXCLUDED.commit_lsn; + + -- Switch _commit_lsn to the sync event LSN for the wait below + _commit_lsn := remotesql::pg_lsn; + + RAISE NOTICE ' OK: %', rpad('Triggered sync event on node ' || rec.node_name || ' (LSN: ' || remotesql || ')', 120, ' '); + EXCEPTION + WHEN OTHERS THEN + RAISE EXCEPTION ' ✗ %', rpad('Triggering sync event on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); + END; + + -- Wait for the source node to have received all changes from this + -- "other" node up to the sync event LSN. This ensures N1 has applied + -- enough of N2's data before the COPY snapshot, reducing the amount + -- N3 must replay directly from N2. + BEGIN + DECLARE + src_progress_lsn pg_lsn; + wait_started timestamptz := clock_timestamp(); + wait_timeout interval := interval '3 minutes'; + progress_sql text; + v_prev_statement_timeout text; + BEGIN + progress_sql := format( + 'SELECT os.remote_lsn ' + 'FROM pg_replication_origin_status os ' + 'JOIN spock.subscription s ON os.external_id = s.sub_slot_name ' + 'JOIN spock.node n ON n.node_id = s.sub_origin ' + 'WHERE s.sub_target = (SELECT node_id FROM spock.node_info()) ' + ' AND n.node_name = %L', + rec.node_name); + + RAISE NOTICE ' - Waiting for source node % to receive % changes up to sync LSN %...', + src_node_name, rec.node_name, _commit_lsn; + + LOOP + BEGIN + v_prev_statement_timeout := current_setting('statement_timeout', true); + PERFORM set_config('statement_timeout', '5s', true); + + SELECT * FROM dblink(src_dsn, progress_sql) + AS t(lsn pg_lsn) INTO src_progress_lsn; + + PERFORM set_config('statement_timeout', coalesce(v_prev_statement_timeout, '0'), true); + EXCEPTION + WHEN OTHERS THEN + PERFORM set_config('statement_timeout', coalesce(v_prev_statement_timeout, '0'), true); + src_progress_lsn := NULL; + END; + + EXIT WHEN src_progress_lsn IS NOT NULL + AND src_progress_lsn >= _commit_lsn; + + IF clock_timestamp() - wait_started > wait_timeout THEN + RAISE WARNING ' Timeout waiting for source node commit catchup (last seen: %)', src_progress_lsn; + EXIT; + END IF; + + PERFORM pg_sleep(0.5); + END LOOP; + + RAISE NOTICE ' OK: %', rpad( + 'Source node ' || src_node_name || ' received ' || rec.node_name + || ' changes up to ' || COALESCE(src_progress_lsn::text, 'unknown') + || ' (needed >= ' || _commit_lsn || ')', 120, ' '); + END; + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING ' Could not verify source commit catchup for %: %', rec.node_name, SQLERRM; + END; + -- Create disabled subscription on new node from "other" node BEGIN sub_name := 'sub_' || rec.node_name || '_' || new_node_name; + -- Drop stale replication origin from a previous add_node cycle + -- so create_sub starts fresh at 0/0 (avoids stale-LSN data loss). + BEGIN + PERFORM dblink_exec( + new_node_dsn, + format($dsql$ + DO $x$ + BEGIN + IF EXISTS (SELECT 1 FROM pg_replication_origin + WHERE roname = %L) THEN + PERFORM pg_replication_origin_drop(%L); + END IF; + END $x$ + $dsql$, + slot_name, slot_name) + ); + RAISE NOTICE ' OK: Dropped stale origin % on new node (if existed)', + slot_name; + EXCEPTION + WHEN OTHERS THEN + RAISE EXCEPTION 'Could not drop stale origin % on new node: %', + slot_name, SQLERRM; + END; CALL spock.create_sub( new_node_dsn, -- Create on new node sub_name, -- sub__ @@ -1362,13 +1452,14 @@ BEGIN false, -- force_text_transfer verb -- verbose ); - RAISE NOTICE ' ✓ %', rpad('Creating initial subscription ' || sub_name || ' on node ' || rec.node_name, 120, ' '); + RAISE NOTICE ' ✓ %', rpad('Creating initial subscription ' || sub_name || ' on new node ' || new_node_name || ' (provider: ' || rec.node_name || ')', 120, ' '); PERFORM pg_sleep(5); subscription_count := subscription_count + 1; EXCEPTION WHEN OTHERS THEN - RAISE NOTICE ' ✗ %', rpad('Creating initial subscription ' || sub_name || ' on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); + RAISE NOTICE ' ✗ %', rpad('Creating initial subscription ' || sub_name || ' on new node ' || new_node_name || ' (provider: ' || rec.node_name || ') (error: ' || SQLERRM || ')', 120, ' '); END; + END LOOP; IF subscription_count = 0 THEN @@ -1406,7 +1497,7 @@ BEGIN -- This ensures the subscription starts replicating from the correct sync point DECLARE sync_lsn text; - timeout_ms integer := 1200; -- 20 minutes + timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; temp_table_exists boolean; BEGIN -- Check if temp_sync_lsns table exists @@ -1428,13 +1519,21 @@ BEGIN END IF; -- Wait for this sync event on the new node where the subscription exists - PERFORM * FROM dblink(new_node_dsn, - format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s)', - src_node_name, sync_lsn, timeout_ms)) AS t(result text); - - IF verb THEN - RAISE NOTICE ' OK: %', rpad('Waiting for sync event from ' || src_node_name || ' on new node ' || new_node_name || '...', 120, ' '); - END IF; + DECLARE + sync_ok text; + BEGIN + SELECT * INTO sync_ok FROM dblink(new_node_dsn, + format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true)', + src_node_name, sync_lsn, timeout_ms)) AS t(result text); + + IF sync_ok IS NULL OR sync_ok::boolean IS NOT TRUE THEN + RAISE EXCEPTION 'wait_for_sync_event timed out for % on new node %', src_node_name, new_node_name; + END IF; + + IF verb THEN + RAISE NOTICE ' OK: %', rpad('Sync event from ' || src_node_name || ' confirmed on new node ' || new_node_name, 120, ' '); + END IF; + END; ELSE RAISE NOTICE ' WARNING: %', rpad('No stored sync LSN found for ' || src_node_name || ', skipping sync wait', 120, ' '); END IF; @@ -1450,7 +1549,8 @@ BEGIN CALL spock.verify_subscription_replicating( new_node_dsn, 'sub_' || src_node_name || '_' || new_node_name, - verb + verb, + 180 ); RAISE NOTICE ' ✓ %', rpad('Enabling subscription ' || sub_name || '...', 120, ' '); @@ -1483,7 +1583,7 @@ BEGIN -- This ensures the subscription starts replicating from the correct sync point DECLARE sync_lsn text; - timeout_ms integer := 1200; -- 20 minutes + timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; BEGIN -- Get the stored sync LSN from when subscription was created SELECT tsl.sync_lsn INTO sync_lsn @@ -1496,13 +1596,21 @@ BEGIN END IF; -- Wait for this sync event on the new node where the subscription exists - PERFORM * FROM dblink(new_node_dsn, - format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s)', - rec.node_name, sync_lsn, timeout_ms)) AS t(result text); - - IF verb THEN - RAISE NOTICE ' OK: %', rpad('Waiting for sync event from ' || rec.node_name || ' on new node ' || new_node_name || '...', 120, ' '); - END IF; + DECLARE + sync_ok text; + BEGIN + SELECT * INTO sync_ok FROM dblink(new_node_dsn, + format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true)', + rec.node_name, sync_lsn, timeout_ms)) AS t(result text); + + IF sync_ok IS NULL OR sync_ok::boolean IS NOT TRUE THEN + RAISE EXCEPTION 'wait_for_sync_event timed out for % on new node %', rec.node_name, new_node_name; + END IF; + + IF verb THEN + RAISE NOTICE ' OK: %', rpad('Sync event from ' || rec.node_name || ' confirmed on new node ' || new_node_name, 120, ' '); + END IF; + END; ELSE RAISE NOTICE ' WARNING: %', rpad('No stored sync LSN found for ' || rec.node_name || ', skipping sync wait', 120, ' '); END IF; @@ -1512,7 +1620,8 @@ BEGIN CALL spock.verify_subscription_replicating( new_node_dsn, 'sub_'|| rec.node_name || '_' || new_node_name, - verb + verb, + 180 ); RAISE NOTICE ' ✓ %', rpad('Enabling subscription ' || sub_name || '...', 120, ' '); @@ -1665,7 +1774,7 @@ CREATE OR REPLACE PROCEDURE spock.trigger_sync_on_other_nodes_and_wait_on_source DECLARE rec RECORD; sync_lsn pg_lsn; - timeout_ms integer := 1200; -- 20 minutes timeout + timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; remotesql text; BEGIN RAISE NOTICE 'Phase 5: Triggering sync events on other nodes and waiting on source'; @@ -1694,14 +1803,21 @@ BEGIN -- Wait for sync event on source node BEGIN - remotesql := format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s);', + remotesql := format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true);', rec.node_name, sync_lsn, timeout_ms); IF verb THEN RAISE NOTICE ' Remote SQL for waiting sync event: %', remotesql; END IF; - PERFORM * FROM dblink(src_dsn, remotesql) AS t(result text); - RAISE NOTICE ' OK: %', rpad('Waiting for sync event from ' || rec.node_name || ' on source node ' || src_node_name || '...', 120, ' '); + DECLARE + sync_ok text; + BEGIN + SELECT * INTO sync_ok FROM dblink(src_dsn, remotesql) AS t(result text); + IF sync_ok IS NULL OR sync_ok::boolean IS NOT TRUE THEN + RAISE EXCEPTION 'wait_for_sync_event timed out for % on source node %', rec.node_name, src_node_name; + END IF; + RAISE NOTICE ' OK: %', rpad('Sync event from ' || rec.node_name || ' confirmed on source node ' || src_node_name, 120, ' '); + END; EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION ' ✗ %', rpad('Waiting for sync event from ' || rec.node_name || ' on source node ' || src_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); @@ -1726,12 +1842,139 @@ DECLARE slot_name text; dbname text; remotesql text; + src_rec RECORD; + src_commit_lsn pg_lsn; + src_slot_name text; + src_dbname text; + current_lsn pg_lsn; + target_lsn pg_lsn; + v_sub_name name; + v_subid oid; + v_pending_sync integer; + v_wait_started timestamptz; + v_sub_status text; BEGIN RAISE NOTICE 'Phase 7: Checking commit timestamp and advancing replication slot'; + -- Wait for src->new COPY to complete so resume_lsn is written to spock.progress. + v_sub_name := ('sub_' || src_node_name || '_' || new_node_name)::name; + RAISE NOTICE ' - Waiting for subscription % to reach READY...', v_sub_name; + BEGIN + -- Avoid rare hangs in C-level sub_wait_for_sync by using a bounded SQL loop. + SELECT sub_id INTO v_subid + FROM spock.subscription + WHERE sub_name = v_sub_name; + + IF v_subid IS NULL THEN + RAISE WARNING ' - Subscription % not found on new node; continuing', v_sub_name; + ELSE + v_wait_started := clock_timestamp(); + LOOP + SELECT count(*) INTO v_pending_sync + FROM spock.local_sync_status + WHERE sync_subid = v_subid + AND sync_status NOT IN ('y', 'r'); + + SELECT status INTO v_sub_status + FROM spock.sub_show_status() + WHERE subscription_name = v_sub_name; + + IF v_pending_sync = 0 THEN + RAISE NOTICE ' - Subscription % is READY', v_sub_name; + EXIT; + END IF; + + IF clock_timestamp() - v_wait_started > interval '3 minutes' THEN + RAISE WARNING ' - Timed out waiting for % to become READY (pending rows: %, status: %); continuing', + v_sub_name, v_pending_sync, coalesce(v_sub_status, ''); + EXIT; + END IF; + + PERFORM pg_sleep(1); + END LOOP; + END IF; + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING ' - READY wait for subscription % failed: %; proceeding anyway', v_sub_name, SQLERRM; + RAISE WARNING ' - Current subscription status snapshot: %', + coalesce((SELECT string_agg(subscription_name || ':' || status, ', ') + FROM spock.sub_show_status()), ''); + END; + + -- Check src->new slot; only advance if it is NOT active (defensive). + BEGIN + RAISE NOTICE ' - Checking source-to-new subscription slot...'; + + -- Get source node info and extract dbname + FOR src_rec IN SELECT * FROM temp_spock_nodes WHERE node_name = src_node_name LOOP + SELECT spock.extract_dbname_from_dsn(src_rec.dsn) INTO src_dbname; + IF src_dbname IS NOT NULL THEN + src_dbname := TRIM(BOTH '''' FROM src_dbname); + END IF; + IF src_dbname IS NULL THEN + src_dbname := 'pgedge'; + END IF; + + -- Generate slot name: spk___sub__ + src_slot_name := spock.spock_gen_slot_name( + src_dbname, src_node_name, + 'sub_' || src_node_name || '_' || new_node_name); + + RAISE NOTICE ' Looking for slot % on source node', src_slot_name; + + -- Check if slot exists on source node and whether it is active + DECLARE + v_slot_active boolean; + BEGIN + remotesql := format( + 'SELECT restart_lsn, active FROM pg_replication_slots WHERE slot_name = %L', + src_slot_name); + SELECT * FROM dblink(src_dsn, remotesql) + AS t(lsn pg_lsn, active boolean) + INTO current_lsn, v_slot_active; + + IF current_lsn IS NULL THEN + RAISE NOTICE ' Slot % not found on source node', src_slot_name; + ELSIF v_slot_active THEN + -- Subscription is running; slot and origin managed by the apply worker + RAISE NOTICE ' Slot % is active (subscription running) — no advance needed', src_slot_name; + ELSE + -- Slot exists but is not active (unusual). Advance defensively. + RAISE NOTICE ' Slot % found at LSN % (inactive)', src_slot_name, current_lsn; + + SELECT p.remote_commit_lsn INTO target_lsn + FROM spock.progress p + JOIN spock.node n ON n.node_id = p.remote_node_id + WHERE n.node_name = src_node_name; + + IF target_lsn IS NOT NULL AND target_lsn > current_lsn THEN + RAISE NOTICE ' Snapshot LSN for %: %', src_node_name, target_lsn; + remotesql := format('SELECT pg_replication_slot_advance(%L, %L::pg_lsn)', src_slot_name, target_lsn); + PERFORM * FROM dblink(src_dsn, remotesql) AS t(result text); + RAISE NOTICE ' OK: Advanced slot % on source node from % to %', src_slot_name, current_lsn, target_lsn; + + IF NOT EXISTS ( + SELECT 1 FROM pg_replication_origin WHERE roname = src_slot_name + ) THEN + RAISE WARNING ' Origin % not found on new node; creating it now', src_slot_name; + PERFORM pg_replication_origin_create(src_slot_name); + END IF; + PERFORM pg_replication_origin_advance(src_slot_name, target_lsn); + RAISE NOTICE ' OK: Advanced replication origin % on new node to %', src_slot_name, target_lsn; + ELSE + RAISE NOTICE ' Slot % already at or beyond snapshot LSN', src_slot_name; + END IF; + END IF; + END; + END LOOP; + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING 'Could not check source-to-new slot: %', SQLERRM; + END; + -- Check if this is a 2-node scenario (only source and new node) IF (SELECT count(*) FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name) = 0 THEN - RAISE NOTICE ' - No other nodes exist, skipping commit timestamp check'; + RAISE NOTICE ' - No other nodes exist, skipping additional commitment checks'; RETURN; END IF; @@ -1793,9 +2036,21 @@ BEGIN CONTINUE; END IF; - target_lsn := commit_lsn; + -- Advance the slot to resume_lsn: the last commit from this node + -- that N1 had applied at snapshot time (stored in N3's spock.progress). + SELECT p.remote_commit_lsn INTO target_lsn + FROM spock.progress p + JOIN spock.node n ON n.node_id = p.remote_node_id + WHERE n.node_name = rec.node_name; + + IF target_lsn IS NULL THEN + RAISE NOTICE ' WARNING: No spock.progress entry for %, falling back to pg_current_wal_lsn()', rec.node_name; + remotesql := 'SELECT pg_current_wal_lsn()'; + SELECT * FROM dblink(rec.dsn, remotesql) AS t(lsn pg_lsn) INTO target_lsn; + END IF; + IF target_lsn IS NULL OR target_lsn <= current_lsn THEN - RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)', slot_name, current_lsn, target_lsn; + RAISE NOTICE ' - Slot % already at or beyond resume_lsn LSN (current: %, target: %)', slot_name, current_lsn, target_lsn; CONTINUE; END IF; @@ -1807,6 +2062,17 @@ BEGIN PERFORM * FROM dblink(rec.dsn, remotesql) AS t(result text); RAISE NOTICE ' OK: %', rpad('Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || target_lsn, 120, ' '); + + -- Advance the replication origin on the new node (subscriber side) + -- directly, not via dblink; the origin is local to the new node. + IF NOT EXISTS ( + SELECT 1 FROM pg_replication_origin WHERE roname = slot_name + ) THEN + RAISE WARNING ' Origin % not found on new node; creating it now (was it created in Phase 3?)', slot_name; + PERFORM pg_replication_origin_create(slot_name); + END IF; + PERFORM pg_replication_origin_advance(slot_name, target_lsn); + RAISE NOTICE ' OK: %', rpad('Advanced replication origin ' || slot_name || ' on new node to ' || target_lsn, 120, ' '); END; EXCEPTION WHEN OTHERS THEN @@ -1832,7 +2098,7 @@ CREATE OR REPLACE PROCEDURE spock.trigger_source_sync_and_wait_on_new_node( DECLARE remotesql text; sync_lsn pg_lsn; - timeout_ms integer := 1200; -- 20 minutes timeout + timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; BEGIN RAISE NOTICE 'Phase 6: Triggering sync on source node and waiting on new node'; @@ -1851,12 +2117,19 @@ BEGIN -- Wait for sync event on new node BEGIN - remotesql := format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s);', src_node_name, sync_lsn, timeout_ms); + remotesql := format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true);', src_node_name, sync_lsn, timeout_ms); IF verb THEN RAISE NOTICE ' Remote SQL for wait_for_sync_event on new node %: %', new_node_name, remotesql; END IF; - PERFORM * FROM dblink(new_node_dsn, remotesql) AS t(result text); - RAISE NOTICE ' OK: %', rpad('Waiting for sync event from ' || src_node_name || ' on new node ' || new_node_name || '...', 120, ' '); + DECLARE + sync_ok text; + BEGIN + SELECT * INTO sync_ok FROM dblink(new_node_dsn, remotesql) AS t(result text); + IF sync_ok IS NULL OR sync_ok::boolean IS NOT TRUE THEN + RAISE EXCEPTION 'wait_for_sync_event timed out for % on new node %', src_node_name, new_node_name; + END IF; + RAISE NOTICE ' OK: %', rpad('Sync event from ' || src_node_name || ' confirmed on new node ' || new_node_name, 120, ' '); + END; EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION ' ✗ %', rpad('Unable to wait for sync event from ' || src_node_name || ' on new node ' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' '); @@ -1877,7 +2150,7 @@ DECLARE sub_rec RECORD; rec RECORD; wait_count integer := 0; - max_wait_count integer := 300; -- Wait up to 300 seconds + max_wait_count integer := 180; -- Wait up to 180 seconds BEGIN -- Let remote subscriptions update their subscription's state. COMMIT; @@ -1913,9 +2186,12 @@ BEGIN IF sub_rec IS NULL THEN RAISE NOTICE ' OK: Replication is active'; EXIT; + ELSIF sub_rec.status IN ('disabled', 'down') THEN + RAISE EXCEPTION 'Subscription % entered terminal state % while waiting for replication to become active', + sub_rec.sub_name, sub_rec.status; ELSIF wait_count >= max_wait_count THEN - RAISE NOTICE ' WARNING: Timeout waiting for subscription % to become active (current status: %)', sub_rec.sub_name, sub_rec.status; - EXIT; + RAISE EXCEPTION 'Timeout waiting for subscription % to become active (current status: %)', + sub_rec.sub_name, sub_rec.status; ELSE RAISE NOTICE ' Waiting for replication... (subscription: %, status: %, attempt %/%)', sub_rec.sub_name, sub_rec.status, wait_count, max_wait_count; @@ -2008,7 +2284,8 @@ CREATE OR REPLACE PROCEDURE spock.add_node( verb boolean DEFAULT false, new_node_location text DEFAULT 'NY', new_node_country text DEFAULT 'USA', - new_node_info jsonb DEFAULT '{}'::jsonb + new_node_info jsonb DEFAULT '{}'::jsonb, + timeout_sec integer DEFAULT 180 ) LANGUAGE plpgsql AS @@ -2016,6 +2293,8 @@ $$ DECLARE initial_node_count integer; BEGIN + -- Store timeout for inner procedures to read + PERFORM set_config('spock.add_node_timeout', timeout_sec::text, true); -- Phase 0: Check Spock version compatibility across all nodes -- Example: Ensure all nodes are running the same Spock version before proceeding CALL spock.check_spock_version_compatibility(src_dsn, new_node_dsn, verb); diff --git a/samples/Z0DAN/zodan_cleanup.sql b/samples/Z0DAN/zodan_cleanup.sql index 0a56e2b8..7655b8ab 100644 --- a/samples/Z0DAN/zodan_cleanup.sql +++ b/samples/Z0DAN/zodan_cleanup.sql @@ -26,6 +26,7 @@ DROP PROCEDURE IF EXISTS spock.create_source_to_new_subscription(text, text, tex DROP PROCEDURE IF EXISTS spock.trigger_sync_on_other_nodes_and_wait_on_source(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.check_commit_timestamp_and_advance_slot(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.present_final_cluster_state(text, integer, boolean); +DROP PROCEDURE IF EXISTS spock.add_node(text, text, text, text, boolean, text, text, jsonb, integer); DROP PROCEDURE IF EXISTS spock.add_node(text, text, text, text, boolean, text, text, jsonb); -- Drop temporary table if it exists diff --git a/sql/spock--5.0.6--6.0.0-devel.sql b/sql/spock--5.0.6--6.0.0-devel.sql index d64e05b2..23492975 100644 --- a/sql/spock--5.0.6--6.0.0-devel.sql +++ b/sql/spock--5.0.6--6.0.0-devel.sql @@ -30,6 +30,115 @@ CREATE VIEW spock.progress AS SELECT oid FROM pg_database WHERE datname = current_database() ); +CREATE FUNCTION spock.pause_apply_workers() +RETURNS void +AS 'MODULE_PATHNAME', 'spock_pause_apply_workers' +LANGUAGE C VOLATILE; + +REVOKE ALL ON FUNCTION spock.pause_apply_workers() FROM PUBLIC; + +CREATE FUNCTION spock.resume_apply_workers() +RETURNS void +AS 'MODULE_PATHNAME', 'spock_resume_apply_workers' +LANGUAGE C VOLATILE; + +REVOKE ALL ON FUNCTION spock.resume_apply_workers() FROM PUBLIC; + +-- Read peer progress (ros.remote_lsn) for all peer subscriptions. +-- Called while apply workers are paused and the slot's snapshot is imported. +-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer. +CREATE FUNCTION spock.read_peer_progress( + p_slot_name text, + p_provider_node_id oid, + p_subscriber_node_id oid +) RETURNS TABLE( + lsn pg_lsn, + snapshot text, + dbid oid, + node_id oid, + remote_node_id oid, + remote_commit_ts timestamptz, + prev_remote_ts timestamptz, + remote_commit_lsn pg_lsn, + remote_insert_lsn pg_lsn, + received_lsn pg_lsn, + last_updated_ts timestamptz, + updated_by_decode boolean +) VOLATILE STRICT LANGUAGE plpgsql AS $$ +DECLARE + v_lsn pg_lsn; + v_snap text; + rec record; + v_n_peers int := 0; +BEGIN + /* + * The slot and snapshot are created by the C caller via the replication + * protocol. The slot's snapshot is imported into this transaction. + * This function just reads peer progress (ros.remote_lsn) while apply + * workers are paused. + */ + + -- Get the slot's LSN and the imported snapshot for the header row. + SELECT restart_lsn INTO v_lsn + FROM pg_replication_slots WHERE slot_name = p_slot_name; + v_snap := ''; -- snapshot managed by C caller + + RAISE NOTICE 'SPOCK cswp slot=% v_lsn=%', p_slot_name, v_lsn; + + -- Header row: lsn + snapshot only. + lsn := v_lsn; + snapshot := v_snap; + RETURN NEXT; + + /* + * Emit one progress row per peer. With apply workers paused, + * ros.remote_lsn is exact: it reflects only committed transactions + * whose effects are visible in the slot snapshot. + */ + FOR rec IN ( + SELECT p.dbid, p.node_id, p.remote_node_id, + p.remote_commit_ts, p.prev_remote_ts, + p.remote_commit_lsn AS grp_remote_commit_lsn, + p.remote_insert_lsn, + p.received_lsn, p.last_updated_ts, p.updated_by_decode, + ros.remote_lsn AS ros_remote_lsn, + sub.sub_slot_name AS sub_slot_name + FROM spock.subscription sub + JOIN spock.progress p + ON p.remote_node_id = sub.sub_origin + AND p.node_id = sub.sub_target + JOIN pg_replication_origin o + ON o.roname = sub.sub_slot_name + LEFT JOIN pg_replication_origin_status ros + ON ros.local_id = o.roident + WHERE sub.sub_target = p_provider_node_id + AND sub.sub_origin <> p_subscriber_node_id + ) LOOP + v_n_peers := v_n_peers + 1; + + lsn := v_lsn; + snapshot := v_snap; + dbid := rec.dbid; + node_id := rec.node_id; + remote_node_id := rec.remote_node_id; + remote_commit_ts := rec.remote_commit_ts; + prev_remote_ts := rec.prev_remote_ts; + remote_commit_lsn := COALESCE(rec.ros_remote_lsn, '0/0'::pg_lsn); + remote_insert_lsn := rec.remote_insert_lsn; + received_lsn := rec.received_lsn; + last_updated_ts := rec.last_updated_ts; + updated_by_decode := rec.updated_by_decode; + + RAISE NOTICE 'SPOCK cswp peer=% resume_lsn=%', + rec.remote_node_id, remote_commit_lsn; + + RETURN NEXT; + END LOOP; + + RAISE NOTICE 'SPOCK cswp slot=% done peers=%', p_slot_name, v_n_peers; +END; +$$; + CREATE VIEW spock.lag_tracker AS SELECT origin.node_name AS origin_name, diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index ab396b30..882082e2 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -121,6 +121,101 @@ CREATE VIEW spock.progress AS SELECT oid FROM pg_database WHERE datname = current_database() ); +-- Read peer progress (ros.remote_lsn) for all peer subscriptions. +-- Called while apply workers are paused and the slot's snapshot is imported. +-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer. +CREATE FUNCTION spock.read_peer_progress( + p_slot_name text, + p_provider_node_id oid, + p_subscriber_node_id oid +) RETURNS TABLE( + lsn pg_lsn, + snapshot text, + dbid oid, + node_id oid, + remote_node_id oid, + remote_commit_ts timestamptz, + prev_remote_ts timestamptz, + remote_commit_lsn pg_lsn, + remote_insert_lsn pg_lsn, + received_lsn pg_lsn, + last_updated_ts timestamptz, + updated_by_decode boolean +) VOLATILE STRICT LANGUAGE plpgsql AS $$ +DECLARE + v_lsn pg_lsn; + v_snap text; + rec record; + v_n_peers int := 0; +BEGIN + /* + * The slot and snapshot are created by the C caller via the replication + * protocol. The slot's snapshot is imported into this transaction. + * This function just reads peer progress (ros.remote_lsn) while apply + * workers are paused. + */ + + -- Get the slot's LSN and the imported snapshot for the header row. + SELECT restart_lsn INTO v_lsn + FROM pg_replication_slots WHERE slot_name = p_slot_name; + v_snap := ''; -- snapshot managed by C caller + + RAISE NOTICE 'SPOCK cswp slot=% v_lsn=%', p_slot_name, v_lsn; + + -- Header row: lsn only (snapshot managed by C caller). + lsn := v_lsn; + snapshot := v_snap; + RETURN NEXT; + + /* + * Emit one progress row per peer. With apply workers paused, + * ros.remote_lsn is exact: it reflects only committed transactions + * whose effects are visible in the slot snapshot. + */ + FOR rec IN ( + SELECT p.dbid, p.node_id, p.remote_node_id, + p.remote_commit_ts, p.prev_remote_ts, + p.remote_commit_lsn AS grp_remote_commit_lsn, + p.remote_insert_lsn, + p.received_lsn, p.last_updated_ts, p.updated_by_decode, + ros.remote_lsn AS ros_remote_lsn, + sub.sub_slot_name AS sub_slot_name + FROM spock.subscription sub + JOIN spock.progress p + ON p.remote_node_id = sub.sub_origin + AND p.node_id = sub.sub_target + JOIN pg_replication_origin o + ON o.roname = sub.sub_slot_name + LEFT JOIN pg_replication_origin_status ros + ON ros.local_id = o.roident + WHERE sub.sub_target = p_provider_node_id + AND sub.sub_origin <> p_subscriber_node_id + ) LOOP + v_n_peers := v_n_peers + 1; + + lsn := v_lsn; + snapshot := v_snap; + dbid := rec.dbid; + node_id := rec.node_id; + remote_node_id := rec.remote_node_id; + remote_commit_ts := rec.remote_commit_ts; + prev_remote_ts := rec.prev_remote_ts; + remote_commit_lsn := COALESCE(rec.ros_remote_lsn, '0/0'::pg_lsn); + remote_insert_lsn := rec.remote_insert_lsn; + received_lsn := rec.received_lsn; + last_updated_ts := rec.last_updated_ts; + updated_by_decode := rec.updated_by_decode; + + RAISE NOTICE 'SPOCK cswp peer=% resume_lsn=%', + rec.remote_node_id, remote_commit_lsn; + + RETURN NEXT; + END LOOP; + + RAISE NOTICE 'SPOCK cswp slot=% done peers=%', p_slot_name, v_n_peers; +END; +$$; + CREATE FUNCTION spock.node_create(node_name name, dsn text, location text DEFAULT NULL, country text DEFAULT NULL, info jsonb DEFAULT NULL) @@ -442,6 +537,20 @@ RETURNS pg_lsn RETURNS NULL ON NULL INPUT AS 'MODULE_PATHNAME', 'spock_create_sync_event' LANGUAGE C VOLATILE; +CREATE FUNCTION spock.pause_apply_workers() +RETURNS void +AS 'MODULE_PATHNAME', 'spock_pause_apply_workers' +LANGUAGE C VOLATILE; + +REVOKE ALL ON FUNCTION spock.pause_apply_workers() FROM PUBLIC; + +CREATE FUNCTION spock.resume_apply_workers() +RETURNS void +AS 'MODULE_PATHNAME', 'spock_resume_apply_workers' +LANGUAGE C VOLATILE; + +REVOKE ALL ON FUNCTION spock.resume_apply_workers() FROM PUBLIC; + CREATE PROCEDURE spock.wait_for_sync_event( OUT result bool, origin_id oid, diff --git a/src/spock.c b/src/spock.c index 02532339..1683e303 100644 --- a/src/spock.c +++ b/src/spock.c @@ -144,6 +144,7 @@ bool allow_ddl_from_functions = false; int restart_delay_default; int restart_delay_on_exception; int spock_replay_queue_size; +int spock_pause_timeout = 10; /* seconds to wait for apply workers to pause */ bool check_all_uc_indexes = false; bool spock_enable_quiet_mode = false; int log_origin_change = SPOCK_ORIGIN_NONE; @@ -1153,6 +1154,21 @@ _PG_init(void) NULL, NULL); + DefineCustomIntVariable("spock.pause_timeout", + "Timeout in seconds for pausing apply workers during slot creation", + "Controls how long add_node waits for apply " + "workers to reach a between-transaction pause point. Increase " + "if add_node fails with a pause timeout under heavy load.", + &spock_pause_timeout, + 10, + 1, + 300, + PGC_USERSET, + GUC_UNIT_S, + NULL, + NULL, + NULL); + DefineCustomIntVariable("spock.exception_replay_queue_size", "Maximum in-memory size for the apply replay queue", "When the replay queue exceeds this size, subsequent " diff --git a/src/spock_apply.c b/src/spock_apply.c index ff25cad7..f78cf19d 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -438,6 +438,35 @@ action_error_callback(void *arg) * existing transaction). * Also provide a global snapshot and ensure we run in ApplyMessageContext. */ + +/* + * If the pause flag is set (slot creation in progress for add_node), + * sleep on the ConditionVariable until resumed or timed out. + */ +static void +maybe_pause_for_slot_creation(void) +{ + if (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0) + { + MyApplyWorker->paused = true; + ConditionVariablePrepareToSleep(&SpockCtx->pause_cv); + while (pg_atomic_read_u32(&SpockCtx->pause_apply) != 0) + { + if (ConditionVariableTimedSleep(&SpockCtx->pause_cv, + spock_pause_timeout * 1000L, + WAIT_EVENT_LOGICAL_APPLY_MAIN)) + { + elog(WARNING, "SPOCK: apply worker pause timed out after %ds, resuming", + spock_pause_timeout); + pg_atomic_write_u32(&SpockCtx->pause_apply, 0); + break; + } + } + ConditionVariableCancelSleep(); + MyApplyWorker->paused = false; + } +} + static bool begin_replication_step(void) { @@ -454,6 +483,18 @@ begin_replication_step(void) if (!IsTransactionState()) { + /* + * Check if slot creation (add_node) needs us to pause. This only + * fires during add_node (a rare operation). The fast path is a + * single atomic read that almost always sees 0. + * + * Runs before StartTransactionCommand so the worker has no xid + * while paused — pause_apply_workers can detect completion via + * xid polling. The previous transaction's commit is fully + * complete, so ros.remote_lsn reflects only committed state. + */ + maybe_pause_for_slot_creation(); + StartTransactionCommand(); spock_apply_heap_begin(); @@ -960,8 +1001,7 @@ handle_commit(StringInfo s) .key.remote_node_id = MySubscription->origin->id, .remote_commit_ts = commit_time, .prev_remote_ts = replorigin_session_origin_timestamp, - .remote_commit_lsn = commit_lsn, - /* Ensure invariant: received_lsn >= remote_commit_lsn */ + .remote_commit_lsn = end_lsn, .received_lsn = end_lsn, /* * Include remote_insert_lsn for WAL persistence. This was already @@ -991,6 +1031,8 @@ handle_commit(StringInfo s) in_remote_transaction = false; + maybe_pause_for_slot_creation(); + /* * Stop replay if we're doing limited replay and we've replayed up to the * last record we're supposed to process. diff --git a/src/spock_functions.c b/src/spock_functions.c index 3f0be6d3..c4b92117 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -133,6 +133,8 @@ PG_FUNCTION_INFO_V1(spock_wait_for_subscription_sync_complete); PG_FUNCTION_INFO_V1(spock_wait_for_table_sync_complete); PG_FUNCTION_INFO_V1(spock_create_sync_event); +PG_FUNCTION_INFO_V1(spock_pause_apply_workers); +PG_FUNCTION_INFO_V1(spock_resume_apply_workers); /* Replication set manipulation. */ PG_FUNCTION_INFO_V1(spock_create_replication_set); @@ -3317,6 +3319,104 @@ spock_create_sync_event(PG_FUNCTION_ARGS) PG_RETURN_LSN(lsn); } +/* + * spock_pause_apply_workers + * + * Temporarily pause all apply workers in this database during slot creation + * for add_node. Sets a shared memory flag that workers check between + * transactions; workers sleep on a ConditionVariable until resumed. + * + * After this function returns, all apply workers have finished their current + * transaction and are paused, so pg_replication_origin_status reflects only + * committed state that is visible in any new snapshot. + */ +Datum +spock_pause_apply_workers(PG_FUNCTION_ARGS) +{ + int i; + int max_wait_ms = spock_pause_timeout * 1000; + int waited_ms = 0; + + /* Signal apply workers to pause at their next between-transaction point. */ + pg_atomic_write_u32(&SpockCtx->pause_apply, 1); + + /* + * Wait until all apply workers in this database have either: + * - no active transaction (xid invalid = between transactions or idle), or + * - set their paused flag (spinning on the pause_apply flag). + * + * Either state means the worker has committed its last transaction and + * cannot apply new DML until the flag is cleared. + */ + while (waited_ms < max_wait_ms) + { + bool all_paused = true; + + for (i = 0; i < SpockCtx->total_workers; i++) + { + SpockWorker *w = &SpockCtx->workers[i]; + + if (w->worker_type != SPOCK_WORKER_APPLY) + continue; + if (w->dboid != MyDatabaseId) + continue; + if (w->proc == NULL) + continue; + + /* Worker is paused (spinning on flag) — good. */ + if (w->worker.apply.paused) + continue; + + /* Worker has no active transaction — also good. */ + if (!TransactionIdIsValid(w->proc->xid)) + continue; + + /* Worker is mid-transaction and not yet paused. */ + all_paused = false; + break; + } + + if (all_paused) + break; + + CHECK_FOR_INTERRUPTS(); + pg_usleep(1000); /* 1ms */ + waited_ms++; + } + + if (waited_ms >= max_wait_ms) + { + pg_atomic_write_u32(&SpockCtx->pause_apply, 0); + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("timed out waiting for apply workers to pause after %d seconds", + spock_pause_timeout), + errhint("Increase spock.pause_timeout if apply workers have long-running transactions."))); + } + else + elog(DEBUG1, "SPOCK pause_apply_workers: all workers paused after %d ms", waited_ms); + + PG_RETURN_VOID(); +} + +/* + * spock_resume_apply_workers + * + * Resume apply workers by clearing the flag and broadcasting on the + * ConditionVariable to wake all sleeping workers. + */ +Datum +spock_resume_apply_workers(PG_FUNCTION_ARGS) +{ + /* Clear the flag and wake all sleeping workers. */ + pg_atomic_write_u32(&SpockCtx->pause_apply, 0); + ConditionVariableBroadcast(&SpockCtx->pause_cv); + + elog(DEBUG1, "SPOCK resume_apply_workers: workers resumed"); + + PG_RETURN_VOID(); +} + /* * Helper function for finding the endptr for a particular commit timestamp */ diff --git a/src/spock_group.c b/src/spock_group.c index 9d26e80e..20fb2d54 100644 --- a/src/spock_group.c +++ b/src/spock_group.c @@ -687,3 +687,86 @@ spock_group_progress_update_list(List *lst) */ list_free_deep(lst); } + +/* + * spock_group_progress_force_set_list + * + * Write resume_lsn into the shmem progress entry for each peer during add_node. + * Uses MAX-by-LSN: preserves the existing entry when it is already >= resume_lsn, + * preventing double-apply from overwriting a live apply worker's higher value. + */ +void +spock_group_progress_force_set_list(List *lst) +{ + ListCell *lc; + + if (!SpockGroupHash || !SpockCtx) + { + elog(WARNING, "SpockGroupHash is not initialized; force-set skipped"); + list_free_deep(lst); + return; + } + + foreach (lc, lst) + { + SpockApplyProgress *sap = (SpockApplyProgress *) lfirst(lc); + SpockGroupEntry *entry; + bool found; + + LWLockAcquire(SpockCtx->apply_group_master_lock, LW_EXCLUSIVE); + + entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &sap->key, + HASH_ENTER, &found); + + if (entry == NULL) + { + LWLockRelease(SpockCtx->apply_group_master_lock); + elog(WARNING, "SpockGroupHash is full, cannot force-set progress for group " + "(dbid=%u, node_id=%u, remote_node_id=%u)", + sap->key.dbid, sap->key.node_id, sap->key.remote_node_id); + continue; + } + + if (!found) + { + init_progress_fields(&entry->progress); + pg_atomic_init_u32(&entry->nattached, 0); + ConditionVariableInit(&entry->prev_processed_cv); + } + else if (entry->progress.remote_commit_lsn >= sap->remote_commit_lsn) + { + /* + * Existing LSN >= resume_lsn. Unconditionally overwrite: the + * value from read_peer_progress is authoritative because + * it was captured at COPY snapshot time. The apply worker may + * have advanced past it since then, but any data it applied + * after the snapshot is NOT in the COPY — so the new node must + * replay from the snapshot boundary, not from the worker's + * current position. + */ + elog(LOG, "SPOCK: force-set %d->%d overwriting: existing=%X/%X -> resume_lsn=%X/%X", + sap->key.remote_node_id, MySubscription->target->id, + LSN_FORMAT_ARGS(entry->progress.remote_commit_lsn), + LSN_FORMAT_ARGS(sap->remote_commit_lsn)); + } + else + { + /* Stale entry below resume_lsn; will be reset after WAL write succeeds. */ + } + + /* WAL-log before shmem update; skipped above when existing LSN is higher. */ + spock_apply_progress_add_to_wal(sap); + + /* Now safe to mutate shmem: WAL write succeeded. */ + init_progress_fields(&entry->progress); + progress_update_struct(&entry->progress, sap); + LWLockRelease(SpockCtx->apply_group_master_lock); + + elog(LOG, "SPOCK: force-set %d->%d commit_lsn=%X/%X insert_lsn=%X/%X", + sap->key.remote_node_id, MySubscription->target->id, + LSN_FORMAT_ARGS(sap->remote_commit_lsn), + LSN_FORMAT_ARGS(sap->remote_insert_lsn)); + } + + list_free_deep(lst); +} diff --git a/src/spock_shmem.c b/src/spock_shmem.c index 389dcc00..f1602bc9 100644 --- a/src/spock_shmem.c +++ b/src/spock_shmem.c @@ -163,6 +163,8 @@ spock_shmem_startup(void) SpockCtx->supervisor = NULL; SpockCtx->subscriptions_changed = false; SpockCtx->total_workers = nworkers; + pg_atomic_init_u32(&SpockCtx->pause_apply, 0); + ConditionVariableInit(&SpockCtx->pause_cv); memset(SpockCtx->workers, 0, sizeof(SpockWorker) * SpockCtx->total_workers); } diff --git a/src/spock_sync.c b/src/spock_sync.c index b3919ef1..06364e26 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -556,6 +556,232 @@ adjust_progress_info(PGconn *origin_conn) return resultList; } +/* + * spock_create_slot_and_read_progress + * + * Pauses apply workers, creates a replication slot via the replication + * protocol (which returns a snapshot consistent with the slot's WAL + * position), reads peer progress while workers are paused, and resumes. + * + * Leaves conn in an open REPEATABLE READ transaction with the slot's + * snapshot imported; caller must call spock_release_slot_snapshot() + * after the COPY phase completes. + */ +static char * +spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn, + const char *slot_name, + Oid origin_node_id, Oid subscriber_node_id, + XLogRecPtr *lsn_out, List **progress_out) +{ + StringInfoData query; + PGresult *res; + char *snapshot = NULL; + List *progress_list = NIL; + int nrows; + int rno; + /* Column indices in the result: lsn(0), snapshot(1), then GP_* + 2 */ + const int COL_LSN = 0; + const int COL_SNAP = 1; + const int COL_OFFSET = 2; /* GP_* indices start at COL_OFFSET */ + + initStringInfo(&query); + + /* + * Drop an existing inactive slot so we can re-create it cleanly. + * Ignore errors (the slot may not exist, which is fine). + */ + appendStringInfo(&query, + "SELECT pg_drop_replication_slot('%s') " + "WHERE EXISTS (" + " SELECT 1 FROM pg_replication_slots " + " WHERE slot_name = '%s' AND NOT active)", + slot_name, slot_name); + res = PQexec(conn, query.data); + PQclear(res); + resetStringInfo(&query); + + elog(LOG, "SPOCK cswp slot=%s provider=%u subscriber=%u", + slot_name, origin_node_id, subscriber_node_id); + + /* + * Pause apply workers so ros.remote_lsn reflects only fully committed + * state when we read it below. + */ + res = PQexec(conn, "SELECT spock.pause_apply_workers()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + elog(ERROR, "could not pause apply workers on origin: %s", + PQerrorMessage(conn)); + PQclear(res); + + /* + * Create the slot via the replication protocol. This returns a snapshot + * consistent with the slot's WAL position — the correct snapshot for COPY. + */ + appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", + slot_name, "spock_output"); + res = PQexec(repl_conn, query.data); + resetStringInfo(&query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + if (sqlstate && + strcmp(sqlstate, "42710") == 0 && + !spock_remote_slot_active(conn, slot_name)) + { + PQclear(res); + spock_drop_remote_slot(conn, slot_name); + + appendStringInfo(&query, + "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", + slot_name, "spock_output"); + res = PQexec(repl_conn, query.data); + resetStringInfo(&query); + } + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + elog(ERROR, "could not create replication slot on provider: %s", + PQresultErrorMessage(res)); + } + + *lsn_out = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, + CStringGetDatum(PQgetvalue(res, 0, 1)))); + snapshot = pstrdup(PQgetvalue(res, 0, 2)); + PQclear(res); + + elog(LOG, "SPOCK cswp slot=%s lsn=%X/%X snapshot=%s", + slot_name, LSN_FORMAT_ARGS(*lsn_out), snapshot); + + /* + * Import the slot's snapshot into a REPEATABLE READ transaction. + * This is the snapshot the COPY will use. + */ + appendStringInfo(&query, + "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; " + "SET TRANSACTION SNAPSHOT '%s'", + snapshot); + res = PQexec(conn, query.data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + elog(ERROR, "could not import snapshot on origin: %s", + PQerrorMessage(conn)); + PQclear(res); + resetStringInfo(&query); + + /* Read peer progress via the SQL function (slot already exists) */ + appendStringInfo(&query, + "SELECT * FROM spock.read_peer_progress" + "('%s', %u, %u)", + slot_name, origin_node_id, subscriber_node_id); + res = PQexec(conn, query.data); + resetStringInfo(&query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + elog(ERROR, "could not create replication slot and get progress on " + "origin: %s", PQerrorMessage(conn)); + + nrows = PQntuples(res); + if (nrows < 1) + elog(ERROR, "spock.read_peer_progress returned no rows"); + + /* Row 0 is the header row: lsn + snapshot, progress fields all NULL. + * lsn_out and snapshot are already set from the replication protocol; + * just log for debugging. Skip COL_LSN/COL_SNAP from SQL result. */ + elog(LOG, "SPOCK cswp slot=%s lsn=%X/%X snapshot=%s peers=%d", + slot_name, LSN_FORMAT_ARGS(*lsn_out), snapshot, nrows - 1); + + /* Rows 1+ are progress entries (remote_node_id NOT NULL) */ + for (rno = 1; rno < nrows; rno++) + { + SpockApplyProgress *sap; + MemoryContext oldctx; + char *remote_node_id_str; + char *remote_commit_ts_str; + char *remote_commit_lsn_str; + char *remote_insert_lsn_str; + char *last_updated_ts_str; + + if (PQgetisnull(res, rno, COL_OFFSET + GP_REMOTE_NODE_ID)) + continue; /* shouldn't happen but be safe */ + + sap = (SpockApplyProgress *) MemoryContextAlloc(CacheMemoryContext, + sizeof(SpockApplyProgress)); + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + sap->key.dbid = MyDatabaseId; + sap->key.node_id = MySubscription->target->id; + remote_node_id_str = PQgetvalue(res, rno, COL_OFFSET + GP_REMOTE_NODE_ID); + sap->key.remote_node_id = atooid(remote_node_id_str); + Assert(OidIsValid(sap->key.remote_node_id)); + + sap->remote_commit_ts = 0; + sap->prev_remote_ts = 0; + if (!PQgetisnull(res, rno, COL_OFFSET + GP_REMOTE_COMMIT_TS)) + { + remote_commit_ts_str = PQgetvalue(res, rno, COL_OFFSET + GP_REMOTE_COMMIT_TS); + sap->remote_commit_ts = str_to_timestamptz(remote_commit_ts_str); + } + sap->prev_remote_ts = sap->remote_commit_ts; + + remote_commit_lsn_str = PQgetvalue(res, rno, COL_OFFSET + GP_REMOTE_COMMIT_LSN); + sap->remote_commit_lsn = str_to_lsn(remote_commit_lsn_str); + + remote_insert_lsn_str = PQgetvalue(res, rno, COL_OFFSET + GP_REMOTE_INSERT_LSN); + sap->remote_insert_lsn = str_to_lsn(remote_insert_lsn_str); + + sap->received_lsn = sap->remote_commit_lsn; + + sap->last_updated_ts = 0; + if (!PQgetisnull(res, rno, COL_OFFSET + GP_LAST_UPDATED_TS)) + { + last_updated_ts_str = PQgetvalue(res, rno, COL_OFFSET + GP_LAST_UPDATED_TS); + sap->last_updated_ts = str_to_timestamptz(last_updated_ts_str); + } + + sap->updated_by_decode = (PQgetvalue(res, rno, COL_OFFSET + GP_UPDATED_BY_DECODE)[0] == 't'); + + progress_list = lappend(progress_list, sap); + MemoryContextSwitchTo(oldctx); + + elog(LOG, "SPOCK cswp peer=%s->%d commit_lsn=%s insert_lsn=%s", + remote_node_id_str, MySubscription->target->id, + remote_commit_lsn_str, remote_insert_lsn_str); + } + + PQclear(res); + + /* + * Resume apply workers now that slot and progress are captured. + * The REPEATABLE READ transaction (and its snapshot) remain open + * for the COPY phase; the workers can resume safely. + */ + res = PQexec(conn, "SELECT spock.resume_apply_workers()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + elog(WARNING, "could not resume apply workers on origin: %s", + PQerrorMessage(conn)); + PQclear(res); + + pfree(query.data); + + *progress_out = progress_list; + return snapshot; +} + +/* + * spock_release_slot_snapshot + * + * Rolls back the snapshot transaction opened by + * spock_create_slot_and_read_progress and closes the connection. + */ +static void +spock_release_slot_snapshot(PGconn *conn) +{ + PGresult *res = PQexec(conn, "ROLLBACK"); + + PQclear(res); + PQfinish(conn); +} + /* * Transaction management for COPY. @@ -998,7 +1224,8 @@ static List * copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, const char *target_dsn, const char *origin_snapshot, - List *replication_sets, const char *origin_name) + List *replication_sets, const char *origin_name, + List **progress_out) { PGconn *origin_conn; PGconn *target_conn; @@ -1009,6 +1236,13 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn, origin_conn = spock_connect(origin_dsn, sub->name, "copy"); start_copy_origin_tx(origin_conn, origin_snapshot); + /* + * Read progress info within the same snapshot used for COPY so that + * the LSN values are consistent with the data we are about to copy. + */ + if (progress_out) + *progress_out = adjust_progress_info(origin_conn); + /* Get tables to copy from origin node. */ tables = spock_get_remote_repset_tables(origin_conn, replication_sets); @@ -1204,30 +1438,60 @@ spock_sync_subscription(SpockSubscription *sub) PGconn *origin_conn; PGconn *origin_conn_repl; char *snapshot; - bool use_failover_slot; List *progress_entries_list = NIL; elog(INFO, "initializing subscriber %s", sub->name); origin_conn = spock_connect(sub->origin_if->dsn, sub->name, "snap"); - - /* 2QPG9.6 and 2QPG11 support failover slots */ - use_failover_slot = - spock_remote_function_exists(origin_conn, "pg_catalog", - "pg_create_logical_replication_slot", - -1, - "failover"); origin_conn_repl = spock_connect_replica(sub->origin_if->dsn, sub->name, "snap"); - progress_entries_list = adjust_progress_info(origin_conn); - snapshot = ensure_replication_slot_snapshot(origin_conn, - origin_conn_repl, - sub->slot_name, - use_failover_slot, &lsn); + /* + * Pause apply workers, create slot via replication protocol + * (returns snapshot consistent with slot's WAL position), + * read peer progress, and resume workers. + */ + PG_TRY(); + { + snapshot = spock_create_slot_and_read_progress( + origin_conn, + origin_conn_repl, + sub->slot_name, + MySubscription->origin->id, + MySubscription->target->id, + &lsn, + &progress_entries_list); + } + PG_CATCH(); + { + ErrorData *edata = CopyErrorData(); + + FlushErrorState(); + elog(LOG, "SPOCK cswp error sub=%s slot=%s: %s", + sub->name, sub->slot_name, + edata->message ? edata->message : ""); + + /* Best-effort resume of apply workers on the remote node. + * If the connection is broken this will fail silently — + * the workers' CV timeout will recover them. */ + if (origin_conn && PQstatus(origin_conn) == CONNECTION_OK) + { + PGresult *rres = PQexec(origin_conn, + "SELECT spock.resume_apply_workers()"); + if (rres) + PQclear(rres); + } + + FreeErrorData(edata); + PG_RE_THROW(); + } + PG_END_TRY(); - PQfinish(origin_conn); + /* origin_conn transaction remains open — snapshot held for COPY. + * Keep origin_conn_repl open too — the exported snapshot file + * in pg_snapshots/ is tied to this connection and needed by + * pg_dump during structure sync. */ PG_ENSURE_ERROR_CLEANUP(spock_sync_worker_cleanup_error_cb, PointerGetDatum(sub)); @@ -1295,13 +1559,14 @@ spock_sync_subscription(SpockSubscription *sub) sub->target_if->dsn, snapshot, sub->replication_sets, - sub->slot_name); + sub->slot_name, + NULL); /* * Arrange replication status according to the just copied * data. */ - spock_group_progress_update_list(progress_entries_list); + spock_group_progress_force_set_list(progress_entries_list); /* Store info about all the synchronized tables. */ StartTransactionCommand(); @@ -1358,6 +1623,7 @@ spock_sync_subscription(SpockSubscription *sub) PointerGetDatum(sub)); PQfinish(origin_conn_repl); + spock_release_slot_snapshot(origin_conn); status = SYNC_STATUS_CATCHUP; StartTransactionCommand(); diff --git a/tests/tap/t/011_zodan_sync_third.pl b/tests/tap/t/011_zodan_sync_third.pl index 9f99c18e..0a7a8d47 100644 --- a/tests/tap/t/011_zodan_sync_third.pl +++ b/tests/tap/t/011_zodan_sync_third.pl @@ -1,6 +1,6 @@ use strict; use warnings; -use Test::More tests => 34; +use Test::More tests => 30; use IPC::Run; use lib '.'; use lib 't'; @@ -110,11 +110,23 @@ $pgbench_handle2->pump(); # Warming up ... -print STDERR "warming up pgbench for 5s\n"; -sleep(5); +print STDERR "warming up pgbench for 30s\n"; +sleep(30); print STDERR "done warmup\n"; print STDERR "Add N3 into highly loaded configuration of N1 and N2 ...\n"; +# Use transdiscard on N3 so that any "row not found" errors during catch-up +# (from transactions whose effects are already in the COPY snapshot) are +# gracefully discarded instead of disabling the subscription. +psql_or_bail(3, "ALTER SYSTEM SET spock.exception_behaviour = 'transdiscard'"); +psql_or_bail(3, "SELECT pg_reload_conf()"); + +# Drain replication backlog before add_node so apply workers are less busy +# during slot creation, increasing the chance of an idle inter-commit gap. +print STDERR "Draining replication before add_node ...\n"; +psql_or_bail(1, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +psql_or_bail(2, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); + psql_or_bail(3, "CALL spock.add_node(src_node_name := 'n1', src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user', @@ -122,6 +134,13 @@ new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user', verb := false);"); +# Wait for replication to stabilize after add_node before checking. +# pgbench is still running so lag won't reach zero; just drain current backlog. +print STDERR "Waiting for replication to settle after add_node ...\n"; +psql_or_bail(1, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +psql_or_bail(2, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +sleep(5); + # Ensure that pgbench load lasts longer than the Z0DAN protocol. my $pid = $pgbench_handle1->{KIDS}[0]{PID}; my $alive = kill 0, $pid; @@ -197,16 +216,12 @@ report_it := true, timeout := '10 minutes', delay := 1.)"); -ok($lag <= 0, "Replication N2 => N1 has been finished successfully"); +ok($lag <= 0, "Replication N3 => N1 has been finished successfully"); $lag = scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n3', - report_it := true, - timeout := '10 minutes', - delay := 1.)"); -ok($lag <= 0, "Replication N1 => N2 has been finished successfully"); - -# ############################################################################## -# -# Try to update an IDENTITY column (pgbench_accounts.aid). This is the case of + report_it := true, + timeout := '10 minutes', + delay := 1.)"); +ok($lag <= 0, "Replication N3 => N2 has been finished successfully"); # 2n congiguration. With non-intersecting load we don't anticipate any issues # with this test. It is written to prepare infrastructure and for demonstration # purposes. @@ -223,6 +238,13 @@ psql_or_bail(3, 'DROP FUNCTION wait_subscription'); psql_or_bail(3, 'VACUUM FULL'); +# Let the cluster settle after remove_node before starting the next cycle. +print STDERR "Waiting for cluster to settle after remove_node ...\n"; +scalar_query(1, "SELECT * FROM wait_subscription(remote_node_name := 'n2', + timeout := '3 minutes', delay := 0.5)"); +scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n1', + timeout := '3 minutes', delay := 0.5)"); + # To improve TPS psql_or_bail(1, "CREATE UNIQUE INDEX ON pgbench_accounts(abs(aid))"); $lag = scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n1', @@ -252,8 +274,8 @@ $pgbench_handle2->pump(); # Warming up ... -print STDERR "warming up pgbench for 20s\n"; -sleep(20); +print STDERR "warming up pgbench for 30s\n"; +sleep(30); print STDERR "done warmup\n"; # Ensure that pgbench load lasts longer than the Z0DAN protocol. @@ -327,11 +349,19 @@ $pgbench_handle2->pump(); # Warming up ... -print STDERR "warming up pgbench for 5s\n"; -sleep(5); +print STDERR "warming up pgbench for 30s\n"; +sleep(30); print STDERR "done warmup\n"; print STDERR "Add N3 into highly loaded configuration of N1 and N2 ..."; +psql_or_bail(3, "ALTER SYSTEM SET spock.exception_behaviour = 'transdiscard'"); +psql_or_bail(3, "SELECT pg_reload_conf()"); + +# Drain replication backlog before second add_node. +print STDERR "Draining replication before second add_node ...\n"; +psql_or_bail(1, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +psql_or_bail(2, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); + psql_or_bail(3, "CALL spock.add_node(src_node_name := 'n1', src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user', @@ -339,7 +369,12 @@ new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user', verb := false);"); -# ... +# Wait for replication to stabilize after second add_node. +# pgbench is still running so lag won't reach zero; just drain current backlog. +print STDERR "Waiting for replication to settle after add_node ...\n"; +psql_or_bail(1, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +psql_or_bail(2, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +sleep(5); # Ensure that pgbench load lasts longer than the Z0DAN protocol. $pid = $pgbench_handle1->{KIDS}[0]{PID}; @@ -373,21 +408,38 @@ print STDERR "DEBUGGING. LSNs: N1: $lsn1, N2: $lsn2, N3: $lsn3\n"; print STDERR "Wait for the N2 -> N1 sync message ...\n"; -psql_or_bail(1, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 600)"); +psql_or_bail(1, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 1200, true)"); print STDERR "Wait for the N1 -> N2 sync message ...\n"; -psql_or_bail(2, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 600)"); +psql_or_bail(2, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 1200, true)"); print STDERR "Wait for the N1 -> N3 sync message ...\n"; -psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 600)"); +psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 1200, true)"); print STDERR "Wait for the N2 -> N3 sync message ...\n"; -psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 600)"); +psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 1200, true)"); print STDERR "LR messages from active nodes has arrived to the new one\n"; print STDERR "Wait for the N3 -> N1 sync message ...\n"; -psql_or_bail(1, "CALL spock.wait_for_sync_event(true, 'n3', '$lsn3'::pg_lsn, 600, true)"); +psql_or_bail(1, "CALL spock.wait_for_sync_event(true, 'n3', '$lsn3'::pg_lsn, 1200, true)"); print STDERR "Wait for the N3 -> N2 sync message ...\n"; -psql_or_bail(2, "CALL spock.wait_for_sync_event(true, 'n3', '$lsn3'::pg_lsn, 600, true)"); +psql_or_bail(2, "CALL spock.wait_for_sync_event(true, 'n3', '$lsn3'::pg_lsn, 1200, true)"); print STDERR "First LR transaction has arrived from new node to the active ones\n"; +# Wait for all replication directions to fully catch up. +psql_or_bail(1, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +psql_or_bail(2, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +psql_or_bail(3, 'SELECT spock.wait_slot_confirm_lsn(NULL, NULL)'); +scalar_query(1, "SELECT * FROM wait_subscription(remote_node_name := 'n2', + timeout := '3 minutes', delay := 0.5)"); +scalar_query(1, "SELECT * FROM wait_subscription(remote_node_name := 'n3', + timeout := '3 minutes', delay := 0.5)"); +scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n1', + timeout := '3 minutes', delay := 0.5)"); +scalar_query(2, "SELECT * FROM wait_subscription(remote_node_name := 'n3', + timeout := '3 minutes', delay := 0.5)"); +scalar_query(3, "SELECT * FROM wait_subscription(remote_node_name := 'n1', + timeout := '3 minutes', delay := 0.5)"); +scalar_query(3, "SELECT * FROM wait_subscription(remote_node_name := 'n2', + timeout := '3 minutes', delay := 0.5)"); + print STDERR "Check the data consistency.\n"; $ret1 = scalar_query(1, "SELECT sum(abalance), sum(aid), count(*) FROM pgbench_accounts"); print STDERR "The N1's pgbench_accounts aggregates: $ret1\n"; diff --git a/tests/tap/t/017_zodan_3n_timeout.pl b/tests/tap/t/017_zodan_3n_timeout.pl index 946d65ca..5b63a62b 100644 --- a/tests/tap/t/017_zodan_3n_timeout.pl +++ b/tests/tap/t/017_zodan_3n_timeout.pl @@ -55,16 +55,17 @@ src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password', new_node_name := 'n2', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password', - verb := false + verb := false, + timeout_sec := 30 )}); my $exit_code = $? >> 8; my $elapsed_time = time() - $start_time; print STDERR "add_node call completed in $elapsed_time seconds (exit code: $exit_code)\n"; -# The call should fail quickly (well under 1200 seconds which is default timeout) +# The call should fail quickly (well under 30s timeout) # We expect it to fail within a few seconds since it should error immediately -ok($elapsed_time < 600, "add_node failed quickly (${elapsed_time}s < 600s), not waiting for timeout"); +ok($elapsed_time < 60, "add_node failed quickly (${elapsed_time}s < 60s), not waiting for timeout"); ok($exit_code != 0, "add_node failed as expected when sync_event is missing (exit code: $exit_code)"); # Restore sync_event function on N1 for cleanup @@ -101,14 +102,15 @@ src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password', new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password', - verb := false + verb := false, + timeout_sec := 30 )}); $exit_code = $? >> 8; $elapsed_time = time() - $start_time; print STDERR "add_node call completed in $elapsed_time seconds (exit code: $exit_code)\n"; -ok($elapsed_time < 600, "add_node on n3 failed quickly"); +ok($elapsed_time < 300, "add_node on n3 failed quickly (${elapsed_time}s < 300s)"); ok($exit_code != 0, "add_node failed as expected when pg_replication_slot_advance is missing (exit code: $exit_code)"); psql_or_bail(2, "ALTER FUNCTION pg_replication_slot_advance_renamed RENAME TO pg_replication_slot_advance"); @@ -132,14 +134,15 @@ src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password', new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password', - verb := false + verb := false, + timeout_sec := 30 )}); $exit_code = $? >> 8; $elapsed_time = time() - $start_time; print STDERR "add_node call completed in $elapsed_time seconds (exit code: $exit_code)\n"; -ok($elapsed_time < 600, "add_node on n3 failed quickly"); +ok($elapsed_time < 300, "add_node on n3 failed quickly (${elapsed_time}s < 300s)"); ok($exit_code != 0, "add_node failed as expected when sub_create is missing (exit code: $exit_code)"); psql_or_bail(1, "ALTER FUNCTION spock.sub_create_renamed RENAME TO sub_create");