@@ -1346,9 +1346,79 @@ BEGIN
13461346 RAISE EXCEPTION ' ✗ %' , rpad(' Creating replication slot ' || slot_name || ' on node ' || rec .node_name || ' (error: ' || SQLERRM || ' )' , 120 , ' ' );
13471347 END;
13481348
1349+ -- Wait for the source node to have committed all changes from this
1350+ -- "other" node up to L_slot, ensuring P_snap >= L_slot when Phase 5
1351+ -- takes the snapshot (prevents data loss in the [P_snap, L_slot) gap).
1352+ BEGIN
1353+ DECLARE
1354+ src_progress_lsn pg_lsn;
1355+ wait_iters integer := 0 ;
1356+ max_wait_iters integer := 2400 ; -- 20 minutes (0.5s per iter)
1357+ progress_sql text ;
1358+ BEGIN
1359+ progress_sql := format(
1360+ ' SELECT p.remote_commit_lsn '
1361+ ' FROM spock.progress p '
1362+ ' JOIN spock.node n ON n.node_id = p.remote_node_id '
1363+ ' WHERE p.node_id = (SELECT node_id FROM spock.node_info()) '
1364+ ' AND n.node_name = %L' ,
1365+ rec .node_name );
1366+
1367+ RAISE NOTICE ' - Waiting for source node % to commit % changes up to slot LSN %...' ,
1368+ src_node_name, rec .node_name , _commit_lsn;
1369+
1370+ LOOP
1371+ SELECT * FROM dblink(src_dsn, progress_sql)
1372+ AS t(lsn pg_lsn) INTO src_progress_lsn;
1373+
1374+ EXIT WHEN src_progress_lsn IS NOT NULL
1375+ AND src_progress_lsn >= _commit_lsn;
1376+
1377+ IF wait_iters >= max_wait_iters THEN
1378+ RAISE WARNING ' Timeout waiting for source node commit catchup (last seen: %)' , src_progress_lsn;
1379+ EXIT;
1380+ END IF;
1381+
1382+ PERFORM pg_sleep(0 .5 );
1383+ wait_iters := wait_iters + 1 ;
1384+ END LOOP;
1385+
1386+ RAISE NOTICE ' OK: %' , rpad(
1387+ ' Source node ' || src_node_name || ' committed ' || rec .node_name
1388+ || ' changes up to ' || COALESCE(src_progress_lsn::text , ' unknown' )
1389+ || ' (needed >= ' || _commit_lsn || ' )' , 120 , ' ' );
1390+ END;
1391+ EXCEPTION
1392+ WHEN OTHERS THEN
1393+ RAISE WARNING ' Could not verify source commit catchup for %: %' , rec .node_name , SQLERRM;
1394+ END;
1395+
13491396 -- Create disabled subscription on new node from "other" node
13501397 BEGIN
13511398 sub_name := ' sub_' || rec .node_name || ' _' || new_node_name;
1399+ -- Drop stale replication origin from a previous add_node cycle
1400+ -- so create_sub starts fresh at 0/0 (avoids stale-LSN data loss).
1401+ BEGIN
1402+ PERFORM dblink_exec(
1403+ new_node_dsn,
1404+ format($dsql$
1405+ DO $x$
1406+ BEGIN
1407+ IF EXISTS (SELECT 1 FROM pg_replication_origin
1408+ WHERE roname = %L) THEN
1409+ PERFORM pg_replication_origin_drop(%L);
1410+ END IF;
1411+ END $x$
1412+ $dsql$,
1413+ slot_name, slot_name)
1414+ );
1415+ RAISE NOTICE ' OK: Dropped stale origin % on new node (if existed)' ,
1416+ slot_name;
1417+ EXCEPTION
1418+ WHEN OTHERS THEN
1419+ RAISE WARNING ' Could not drop stale origin % on new node: %' ,
1420+ slot_name, SQLERRM;
1421+ END;
13521422 CALL spock .create_sub (
13531423 new_node_dsn, -- Create on new node
13541424 sub_name, -- sub_<new_node>_<other_node>
@@ -1362,12 +1432,12 @@ BEGIN
13621432 false, -- force_text_transfer
13631433 verb -- verbose
13641434 );
1365- RAISE NOTICE ' ✓ %' , rpad(' Creating initial subscription ' || sub_name || ' on node ' || rec .node_name , 120 , ' ' );
1435+ RAISE NOTICE ' ✓ %' , rpad(' Creating initial subscription ' || sub_name || ' on new node ' || new_node_name || ' (provider: ' || rec .node_name || ' ) ' , 120 , ' ' );
13661436 PERFORM pg_sleep(5 );
13671437 subscription_count := subscription_count + 1 ;
13681438 EXCEPTION
13691439 WHEN OTHERS THEN
1370- RAISE NOTICE ' ✗ %' , rpad(' Creating initial subscription ' || sub_name || ' on node ' || rec .node_name || ' (error: ' || SQLERRM || ' )' , 120 , ' ' );
1440+ RAISE NOTICE ' ✗ %' , rpad(' Creating initial subscription ' || sub_name || ' on new node ' || new_node_name || ' (provider: ' || rec .node_name || ' ) (error: ' || SQLERRM || ' )' , 120 , ' ' );
13711441 END;
13721442 END LOOP;
13731443
@@ -1429,7 +1499,7 @@ BEGIN
14291499
14301500 -- Wait for this sync event on the new node where the subscription exists
14311501 PERFORM * FROM dblink(new_node_dsn,
1432- format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s)' ,
1502+ format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true )' ,
14331503 src_node_name, sync_lsn, timeout_ms)) AS t(result text );
14341504
14351505 IF verb THEN
@@ -1450,7 +1520,8 @@ BEGIN
14501520 CALL spock .verify_subscription_replicating (
14511521 new_node_dsn,
14521522 ' sub_' || src_node_name || ' _' || new_node_name,
1453- verb
1523+ verb,
1524+ 1200
14541525 );
14551526
14561527 RAISE NOTICE ' ✓ %' , rpad(' Enabling subscription ' || sub_name || ' ...' , 120 , ' ' );
@@ -1497,7 +1568,7 @@ BEGIN
14971568
14981569 -- Wait for this sync event on the new node where the subscription exists
14991570 PERFORM * FROM dblink(new_node_dsn,
1500- format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s)' ,
1571+ format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true )' ,
15011572 rec .node_name , sync_lsn, timeout_ms)) AS t(result text );
15021573
15031574 IF verb THEN
@@ -1512,7 +1583,8 @@ BEGIN
15121583 CALL spock .verify_subscription_replicating (
15131584 new_node_dsn,
15141585 ' sub_' || rec .node_name || ' _' || new_node_name,
1515- verb
1586+ verb,
1587+ 1200
15161588 );
15171589
15181590 RAISE NOTICE ' ✓ %' , rpad(' Enabling subscription ' || sub_name || ' ...' , 120 , ' ' );
@@ -1694,7 +1766,7 @@ BEGIN
16941766
16951767 -- Wait for sync event on source node
16961768 BEGIN
1697- remotesql := format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s);' ,
1769+ remotesql := format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true );' ,
16981770 rec .node_name , sync_lsn, timeout_ms);
16991771 IF verb THEN
17001772 RAISE NOTICE ' Remote SQL for waiting sync event: %' , remotesql;
@@ -1726,12 +1798,101 @@ DECLARE
17261798 slot_name text ;
17271799 dbname text ;
17281800 remotesql text ;
1801+ src_rec RECORD;
1802+ src_commit_lsn pg_lsn;
1803+ src_slot_name text ;
1804+ src_dbname text ;
1805+ current_lsn pg_lsn;
1806+ target_lsn pg_lsn;
1807+ v_sub_name name;
17291808BEGIN
17301809 RAISE NOTICE ' Phase 7: Checking commit timestamp and advancing replication slot' ;
17311810
1811+ -- Wait for src->new COPY to complete so P_snap is written to spock.progress.
1812+ v_sub_name := (' sub_' || src_node_name || ' _' || new_node_name)::name;
1813+ RAISE NOTICE ' - Waiting for subscription % to reach READY...' , v_sub_name;
1814+ BEGIN
1815+ PERFORM spock .sub_wait_for_sync (v_sub_name);
1816+ RAISE NOTICE ' - Subscription % is READY' , v_sub_name;
1817+ EXCEPTION
1818+ WHEN OTHERS THEN
1819+ RAISE WARNING ' - sub_wait_for_sync(%) failed: %; proceeding anyway' , v_sub_name, SQLERRM;
1820+ END;
1821+
1822+ -- Check src->new slot; only advance if it is NOT active (defensive).
1823+ BEGIN
1824+ RAISE NOTICE ' - Checking source-to-new subscription slot...' ;
1825+
1826+ -- Get source node info and extract dbname
1827+ FOR src_rec IN SELECT * FROM temp_spock_nodes WHERE node_name = src_node_name LOOP
1828+ SELECT spock .extract_dbname_from_dsn (src_rec .dsn ) INTO src_dbname;
1829+ IF src_dbname IS NOT NULL THEN
1830+ src_dbname := TRIM (BOTH ' ' ' ' FROM src_dbname);
1831+ END IF;
1832+ IF src_dbname IS NULL THEN
1833+ src_dbname := ' pgedge' ;
1834+ END IF;
1835+
1836+ -- Generate slot name: spk_<dbname>_<src>_sub_<src>_<new>
1837+ src_slot_name := spock .spock_gen_slot_name (
1838+ src_dbname, src_node_name,
1839+ ' sub_' || src_node_name || ' _' || new_node_name);
1840+
1841+ RAISE NOTICE ' Looking for slot % on source node' , src_slot_name;
1842+
1843+ -- Check if slot exists on source node and whether it is active
1844+ DECLARE
1845+ v_slot_active boolean ;
1846+ BEGIN
1847+ remotesql := format(
1848+ ' SELECT restart_lsn, active FROM pg_replication_slots WHERE slot_name = %L' ,
1849+ src_slot_name);
1850+ SELECT * FROM dblink(src_dsn, remotesql)
1851+ AS t(lsn pg_lsn, active boolean )
1852+ INTO current_lsn, v_slot_active;
1853+
1854+ IF current_lsn IS NULL THEN
1855+ RAISE NOTICE ' Slot % not found on source node' , src_slot_name;
1856+ ELSIF v_slot_active THEN
1857+ -- Subscription is running; slot and origin managed by the apply worker
1858+ RAISE NOTICE ' Slot % is active (subscription running) — no advance needed' , src_slot_name;
1859+ ELSE
1860+ -- Slot exists but is not active (unusual). Advance defensively.
1861+ RAISE NOTICE ' Slot % found at LSN % (inactive)' , src_slot_name, current_lsn;
1862+
1863+ SELECT p .remote_commit_lsn INTO target_lsn
1864+ FROM spock .progress p
1865+ JOIN spock .node n ON n .node_id = p .remote_node_id
1866+ WHERE n .node_name = src_node_name;
1867+
1868+ IF target_lsn IS NOT NULL AND target_lsn > current_lsn THEN
1869+ RAISE NOTICE ' Snapshot LSN for %: %' , src_node_name, target_lsn;
1870+ remotesql := format(' SELECT pg_replication_slot_advance(%L, %L::pg_lsn)' , src_slot_name, target_lsn);
1871+ PERFORM * FROM dblink(src_dsn, remotesql) AS t(result text );
1872+ RAISE NOTICE ' OK: Advanced slot % on source node from % to %' , src_slot_name, current_lsn, target_lsn;
1873+
1874+ IF NOT EXISTS (
1875+ SELECT 1 FROM pg_replication_origin WHERE roname = src_slot_name
1876+ ) THEN
1877+ RAISE WARNING ' Origin % not found on new node; creating it now' , src_slot_name;
1878+ PERFORM pg_replication_origin_create(src_slot_name);
1879+ END IF;
1880+ PERFORM pg_replication_origin_advance(src_slot_name, target_lsn);
1881+ RAISE NOTICE ' OK: Advanced replication origin % on new node to %' , src_slot_name, target_lsn;
1882+ ELSE
1883+ RAISE NOTICE ' Slot % already at or beyond snapshot LSN' , src_slot_name;
1884+ END IF;
1885+ END IF;
1886+ END;
1887+ END LOOP;
1888+ EXCEPTION
1889+ WHEN OTHERS THEN
1890+ RAISE WARNING ' Could not check source-to-new slot: %' , SQLERRM;
1891+ END;
1892+
17321893 -- Check if this is a 2-node scenario (only source and new node)
17331894 IF (SELECT count (* ) FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name) = 0 THEN
1734- RAISE NOTICE ' - No other nodes exist, skipping commit timestamp check ' ;
1895+ RAISE NOTICE ' - No other nodes exist, skipping additional commitment checks ' ;
17351896 RETURN;
17361897 END IF;
17371898
@@ -1793,9 +1954,21 @@ BEGIN
17931954 CONTINUE;
17941955 END IF;
17951956
1796- target_lsn := commit_lsn;
1957+ -- Advance the slot to P_snap: the last commit from this node
1958+ -- that N1 had applied at snapshot time (stored in N3's spock.progress).
1959+ SELECT p .remote_commit_lsn INTO target_lsn
1960+ FROM spock .progress p
1961+ JOIN spock .node n ON n .node_id = p .remote_node_id
1962+ WHERE n .node_name = rec .node_name ;
1963+
1964+ IF target_lsn IS NULL THEN
1965+ RAISE NOTICE ' WARNING: No spock.progress entry for %, falling back to pg_current_wal_lsn()' , rec .node_name ;
1966+ remotesql := ' SELECT pg_current_wal_lsn()' ;
1967+ SELECT * FROM dblink(rec .dsn , remotesql) AS t(lsn pg_lsn) INTO target_lsn;
1968+ END IF;
1969+
17971970 IF target_lsn IS NULL OR target_lsn <= current_lsn THEN
1798- RAISE NOTICE ' - Slot % already at or beyond target LSN (current: %, target: %)' , slot_name, current_lsn, target_lsn;
1971+ RAISE NOTICE ' - Slot % already at or beyond P_snap LSN (current: %, target: %)' , slot_name, current_lsn, target_lsn;
17991972 CONTINUE;
18001973 END IF;
18011974
@@ -1807,6 +1980,17 @@ BEGIN
18071980
18081981 PERFORM * FROM dblink(rec .dsn , remotesql) AS t(result text );
18091982 RAISE NOTICE ' OK: %' , rpad(' Advanced slot ' || slot_name || ' from ' || current_lsn || ' to ' || target_lsn, 120 , ' ' );
1983+
1984+ -- Advance the replication origin on the new node (subscriber side)
1985+ -- directly, not via dblink; the origin is local to the new node.
1986+ IF NOT EXISTS (
1987+ SELECT 1 FROM pg_replication_origin WHERE roname = slot_name
1988+ ) THEN
1989+ RAISE WARNING ' Origin % not found on new node; creating it now (was it created in Phase 3?)' , slot_name;
1990+ PERFORM pg_replication_origin_create(slot_name);
1991+ END IF;
1992+ PERFORM pg_replication_origin_advance(slot_name, target_lsn);
1993+ RAISE NOTICE ' OK: %' , rpad(' Advanced replication origin ' || slot_name || ' on new node to ' || target_lsn, 120 , ' ' );
18101994 END;
18111995 EXCEPTION
18121996 WHEN OTHERS THEN
@@ -1851,7 +2035,7 @@ BEGIN
18512035
18522036 -- Wait for sync event on new node
18532037 BEGIN
1854- remotesql := format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s);' , src_node_name, sync_lsn, timeout_ms);
2038+ remotesql := format(' CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true );' , src_node_name, sync_lsn, timeout_ms);
18552039 IF verb THEN
18562040 RAISE NOTICE ' Remote SQL for wait_for_sync_event on new node %: %' , new_node_name, remotesql;
18572041 END IF;
@@ -1877,7 +2061,7 @@ DECLARE
18772061 sub_rec RECORD;
18782062 rec RECORD;
18792063 wait_count integer := 0 ;
1880- max_wait_count integer := 300 ; -- Wait up to 300 seconds
2064+ max_wait_count integer := 1200 ; -- Wait up to 1200 seconds
18812065BEGIN
18822066 -- Let remote subscriptions update their subscription's state.
18832067 COMMIT ;
0 commit comments