From 4360ca6f7994fb745071e1e7e90ab581ce6e7be5 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Tue, 17 Mar 2026 13:25:27 +0100 Subject: [PATCH 1/5] Add DISCARDFILE support for TRANSDISCARD exception handling Preparation commit. In TRANSDISCARD mode, instead of executing each DML in a subtransaction during replay (and logging to exception_log), dump records directly to a per-database binary file under pg_spock/. This follows the Oracle GoldenGate approach: on error, skip re-execution and just record what was discarded. - Add discardfile_write() to serialize records as length-prefixed JSON to pg_spock/discard_.log, using an atomic counter for IDs and a dedicated LWLock for concurrent append safety. - Add spock.discard_read() SQL function to read the file back as a table. - Wire TRANSDISCARD/SUB_DISABLE paths in handle_insert() to use the discardfile instead of subtransaction replay. Further commits should add this behaviour into other handle_* routines. --- include/spock_exception_handler.h | 6 + include/spock_worker.h | 3 + sql/spock--6.0.0-devel.sql | 3 + src/spock_apply.c | 50 ++-- src/spock_exception_handler.c | 270 +++++++++++++++++++++ src/spock_shmem.c | 7 + tests/regress/expected/replication_set.out | 40 ++- tests/regress/sql/replication_set.sql | 14 ++ 8 files changed, 367 insertions(+), 26 deletions(-) diff --git a/include/spock_exception_handler.h b/include/spock_exception_handler.h index 7997880d..7d940b11 100644 --- a/include/spock_exception_handler.h +++ b/include/spock_exception_handler.h @@ -99,4 +99,10 @@ extern void spock_disable_subscription(SpockSubscription *sub, XLogRecPtr lsn, TimestampTz ts); +extern bool discardfile_write(const char *node_name, SpockRelation *rel, + Oid remote_origin, Oid local_origin, + const char *operation, SpockTupleData *oldtup, + SpockTupleData *newtup, + TransactionId remote_xid); + #endif /* SPOCK_EXCEPTION_HANDLER_H */ diff --git a/include/spock_worker.h b/include/spock_worker.h index f3a17d8d..36365a08 100644 --- a/include/spock_worker.h +++ b/include/spock_worker.h @@ -106,6 +106,9 @@ typedef struct SpockContext /* Manages access to SpockGroupHash */ LWLock *apply_group_master_lock; + /* DISCARDFILE: lock for concurrent append by apply workers */ + LWLock *discard_file_lock; + /* Background workers. */ int total_workers; SpockWorker workers[FLEXIBLE_ARRAY_MEMBER]; diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 9cf0088d..ef4c891b 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -98,6 +98,9 @@ CREATE TABLE spock.exception_status_detail ( REFERENCES spock.exception_status ) WITH (user_catalog_table=true); +CREATE OR REPLACE FUNCTION spock.discard_read() +RETURNS SETOF jsonb VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_discard_read'; + CREATE FUNCTION spock.apply_group_progress ( OUT dbid oid, OUT node_id oid, diff --git a/src/spock_apply.c b/src/spock_apply.c index 46d7d96d..dae40f2f 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -1308,27 +1308,43 @@ handle_insert(StringInfo s) /* TODO: Handle multiple inserts */ if (MyApplyWorker->use_try_block) { - PG_TRY(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_insert(rel, &newtup); - } - PG_CATCH(); - { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); - if (!failed) + /* + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. + */ + discardfile_write(local_node->node->name, rel, remote_origin_id, + local_node->node->id, "INSERT", NULL, + &newtup, remote_xid); + failed = false; + } + else { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_insert(rel, &newtup); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } diff --git a/src/spock_exception_handler.c b/src/spock_exception_handler.c index 09cd2cd0..11b4425a 100644 --- a/src/spock_exception_handler.c +++ b/src/spock_exception_handler.c @@ -46,6 +46,10 @@ #include "pgstat.h" +#include "funcapi.h" +#include "storage/fd.h" +#include "utils/json.h" + #include "spock_sync.h" #include "spock_worker.h" #include "spock_conflict.h" @@ -74,6 +78,24 @@ #define CATALOG_EXCEPTION_LOG "exception_log" +#define DISCARDFILE_DIR "pg_spock" +#define DISCARDFILE_FMT DISCARDFILE_DIR "/discard_%u.log" + +/* File format version — written as a 4-byte header, checked on read */ +#define DISCARDFILE_VERSION 1 + +/* JSON field names — keep in sync between discardfile_write and discard_read */ +#define DF_XID "xid" +#define DF_NODE "node_name" +#define DF_LOG_TIME "log_time" +#define DF_RELNAME "relname" +#define DF_LOCAL_ORIGIN "local_origin" +#define DF_REMOTE_ORIGIN "remote_origin" +#define DF_OPERATION "operation" +#define DF_OLD_TUPLE "old_tuple" +#define DF_REMOTE_TUPLE "remote_tuple" +#define DF_REMOTE_XID "remote_xid" + SpockExceptionLog *exception_log_ptr = NULL; int exception_behaviour = TRANSDISCARD; int exception_logging = LOG_ALL; @@ -269,3 +291,251 @@ spock_disable_subscription(SpockSubscription *sub, if (started_tx) CommitTransactionCommand(); } + +/* + * Get the path to the DISCARDFILE for the current database. + * The caller must provide a buffer of at least MAXPGPATH bytes. + */ +static void +discardfile_path(char *path) +{ + snprintf(path, MAXPGPATH, DISCARDFILE_FMT, MyDatabaseId); +} + +/* + * Ensure the pg_spock directory exists under PGDATA. + */ +static void +discardfile_ensure_dir(void) +{ + char dirpath[MAXPGPATH]; + + snprintf(dirpath, MAXPGPATH, "%s", DISCARDFILE_DIR); + (void) MakePGDirectory(dirpath); +} + +/* + * Append a single record to the DISCARDFILE. + * + * This function is safe to call outside a transaction — it does not access + * catalog tables. Each call writes one binary length-prefixed record: a + * 32-bit native-endian length header (the StringInfoData.len field, which is + * a signed int) followed by exactly that many bytes of JSON payload. Readers + * must parse the length header first to determine where each record ends. + * + * Locking: acquires SpockCtx->discard_file_lock in exclusive mode to + * serialize concurrent writes from different apply workers. + * + * Memory leaking. Being executed on a per-row basis it should be executed + * inside a short living memory context - consider multiple potential memory + * allocations inside a JSON code. + * + * Returns true on success, false if the record could not be written + * (a WARNING is emitted in that case). + */ +bool +discardfile_write(const char *node_name, SpockRelation *rel, Oid remote_origin, + Oid local_origin, const char *operation, + SpockTupleData *oldtup, SpockTupleData *newtup, + TransactionId remote_xid) +{ + char path[MAXPGPATH]; + StringInfoData buf; + char *old_json = NULL; + char *new_json = NULL; + int fd; + TupleDesc tupdesc = RelationGetDescr(rel->rel); + + Assert(SpockCtx != NULL); + + /* Serialize tuples to JSON before taking the lock */ + if (oldtup != NULL && tupdesc != NULL) + old_json = spock_tuple_to_json_cstring(oldtup, tupdesc); + if (newtup != NULL && tupdesc != NULL) + new_json = spock_tuple_to_json_cstring(newtup, tupdesc); + + /* Build the JSON record. Field names use DF_* defines from above. */ + initStringInfo(&buf); + appendStringInfo(&buf, "{\"" DF_XID "\": %u", remote_xid); + + appendStringInfoString(&buf, ", \"" DF_NODE "\": "); + escape_json(&buf, node_name ? node_name : ""); + + appendStringInfoString(&buf, ", \"" DF_LOG_TIME "\": "); + escape_json(&buf, timestamptz_to_str(GetCurrentTimestamp())); + + appendStringInfoString(&buf, ", \"" DF_RELNAME "\": "); + escape_json(&buf, quote_qualified_identifier(rel->nspname, rel->relname)); + + appendStringInfo(&buf, ", \"" DF_LOCAL_ORIGIN "\": %u", local_origin); + + appendStringInfo(&buf, ", \"" DF_REMOTE_ORIGIN "\": %u", remote_origin); + + appendStringInfoString(&buf, ", \"" DF_OPERATION "\": "); + escape_json(&buf, operation ? operation : ""); + + if (old_json != NULL) + appendStringInfo(&buf, ", \"" DF_OLD_TUPLE "\": %s", old_json); + + if (new_json != NULL) + appendStringInfo(&buf, ", \"" DF_REMOTE_TUPLE "\": %s", new_json); + + appendStringInfo(&buf, ", \"" DF_REMOTE_XID "\": %u}", remote_xid); + + /* Write under lock: [uint32 length][json data] */ + LWLockAcquire(SpockCtx->discard_file_lock, LW_EXCLUSIVE); + + discardfile_ensure_dir(); + discardfile_path(path); + + fd = OpenTransientFile(path, O_WRONLY | O_CREAT | O_APPEND | PG_BINARY); + if (fd < 0) + { + LWLockRelease(SpockCtx->discard_file_lock); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open discard file \"%s\": %m", path))); + pfree(buf.data); + return false; + } + + /* Write version header if this is a new file */ + if (lseek(fd, 0, SEEK_END) == 0) + { + uint32 version = DISCARDFILE_VERSION; + + if (write(fd, &version, sizeof(version)) != sizeof(version)) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not write discard file header \"%s\": %m", + path))); + pfree(buf.data); + return false; + } + } + + if (write(fd, &buf.len, sizeof(buf.len)) != sizeof(buf.len) || + write(fd, buf.data, buf.len) != buf.len) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not write to discard file \"%s\": %m", + path))); + pfree(buf.data); + return false; + } + + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + + return true; +} + +/* + * SQL-callable function: spock.discard_read() + * + * Returns the contents of the current database's DISCARDFILE as a set of + * single-column jsonb records. Users extract fields in SQL, e.g.: + * + * SELECT rec->>'node_name', rec->>'operation', rec->'remote_tuple' + * FROM spock.discard_read() AS rec; + * + * TODO: pass through the code scrupulously and decide on safe reading and error + * processing. Too much for a single commit, though. + */ +PG_FUNCTION_INFO_V1(spock_discard_read); +Datum +spock_discard_read(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + char path[MAXPGPATH]; + int fd; + int reclen; + + InitMaterializedSRF(fcinfo, MAT_SRF_USE_EXPECTED_DESC | MAT_SRF_BLESS); + + discardfile_path(path); + + if (SpockCtx == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("spock shared memory context is not initialized"))); + + /* + * Acquire the discard-file lock in shared mode so that concurrent + * writers (which take LW_EXCLUSIVE) cannot produce a partial record + * while we are reading. + */ + LWLockAcquire(SpockCtx->discard_file_lock, LW_SHARED); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + { + LWLockRelease(SpockCtx->discard_file_lock); + if (errno == ENOENT) + PG_RETURN_VOID(); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open discard file \"%s\": %m", path))); + } + + /* Validate file format version */ + { + uint32 version; + + if (read(fd, &version, sizeof(version)) != sizeof(version)) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + PG_RETURN_VOID(); /* empty file */ + } + if (version != DISCARDFILE_VERSION) + { + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("unsupported discard file version %u (expected %u)", + version, DISCARDFILE_VERSION))); + } + } + + while (read(fd, &reclen, sizeof(reclen)) == sizeof(reclen)) + { + Datum value; + bool null = false; + char *rec; + + rec = palloc(reclen + 1); + if (read(fd, rec, reclen) != reclen) + { + pfree(rec); + /* + * In case of crash and semi-written file this option allows us to + * use all the records have written before the failed operation. + */ + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("truncated record in discard file \"%s\"", + path))); + break; + } + rec[reclen] = '\0'; + + value = DirectFunctionCall1(jsonb_in, CStringGetDatum(rec)); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, + &value, &null); + pfree(rec); + } + + CloseTransientFile(fd); + LWLockRelease(SpockCtx->discard_file_lock); + + PG_RETURN_VOID(); +} diff --git a/src/spock_shmem.c b/src/spock_shmem.c index 389dcc00..1667e692 100644 --- a/src/spock_shmem.c +++ b/src/spock_shmem.c @@ -118,6 +118,9 @@ spock_shmem_request(void) /* For SpockCtx->lock */ RequestNamedLWLockTranche("spock context lock", 1); + + /* For SpockCtx->discard_file_lock */ + RequestNamedLWLockTranche("spock discard file lock", 1); } /* @@ -165,6 +168,10 @@ spock_shmem_startup(void) SpockCtx->total_workers = nworkers; memset(SpockCtx->workers, 0, sizeof(SpockWorker) * SpockCtx->total_workers); + + /* Initialize DISCARDFILE support */ + SpockCtx->discard_file_lock = + &((GetNamedLWLockTranche("spock discard file lock")[0]).lock); } /* Initialize worker subsystem shared memory structures */ diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 398ea9ac..c5a90769 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -459,12 +459,23 @@ ORDER BY command_counter; -----------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 5 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid (6 rows) +-- Check discard file contents (TRANSDISCARD records go here) +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + id | node_name | relname | operation | remote_tuple +----+-----------------+------------------+-----------+---------------------------------------------------- + | test_subscriber | public.spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] +(1 row) + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); NOTICE: drop cascades to table spoc_102g membership in replication set default @@ -576,16 +587,27 @@ ORDER BY command_counter; -----------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 5 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 6 | | | UPDATE | | Spock can't find relation 7 | | | UPDATE | | Spock can't find relation - 8 | | | UPDATE | | Spock can't find relation - 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) + 8 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + 9 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) (10 rows) +-- Check discard file contents for UPDATE tests +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + id | node_name | relname | operation | remote_tuple +----+-----------------+------------------+-----------+---------------------------------------------------- + | test_subscriber | public.spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] +(1 row) + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE'); NOTICE: drop cascades to table spoc_102g_u membership in replication set default diff --git a/tests/regress/sql/replication_set.sql b/tests/regress/sql/replication_set.sql index b9132a73..ebde188d 100644 --- a/tests/regress/sql/replication_set.sql +++ b/tests/regress/sql/replication_set.sql @@ -214,6 +214,13 @@ SELECT FROM spock.exception_log ORDER BY command_counter; +-- Check discard file contents (TRANSDISCARD records go here) +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -268,6 +275,13 @@ SELECT FROM spock.exception_log ORDER BY command_counter; +-- Check discard file contents for UPDATE tests +SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, + rec->>'relname' AS relname, rec->>'operation' AS operation, + rec->'remote_tuple' AS remote_tuple +FROM spock.discard_read() AS rec +ORDER BY (rec->>'id')::bigint; + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE'); From bb07a26c7847d0db249afb287b9427b351b82c32 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Thu, 19 Mar 2026 10:55:30 +0100 Subject: [PATCH 2/5] Skip per-row exception logging in handle_insert In TRANSDISCARD and SUB_DISABLE modes, handle_commit already logs a single exception entry for the entire discarded transaction. The per-row log_insert_exception call in handle_insert was producing duplicate entries. Move the call inside the DISCARD-mode branch so only DISCARD mode logs per-row exceptions; TRANSDISCARD/SUB_DISABLE rely on the discard file for per-row detail and handle_commit for the audit-trail entry. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/spock_apply.c | 22 +++++++++++----------- tests/regress/expected/replication_set.out | 6 ++---- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/spock_apply.c b/src/spock_apply.c index dae40f2f..389fad7d 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -1346,19 +1346,19 @@ handle_insert(StringInfo s) if (!failed) ReleaseCurrentSubTransaction(); - } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ - { - char *error_msg = edata ? edata->message : - (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? - exception_log_ptr[my_exception_log_index].initial_error_message : NULL); + /* + * Log the exception for DISCARD mode. TRANSDISCARD and + * SUB_DISABLE log a single entry per transaction in + * handle_commit; per-row data goes to the discard file. + */ + { + char *error_msg = edata ? edata->message : + (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? + exception_log_ptr[my_exception_log_index].initial_error_message : NULL); - log_insert_exception(failed, error_msg, rel, NULL, &newtup, "INSERT"); + log_insert_exception(failed, error_msg, rel, NULL, &newtup, "INSERT"); + } } } else diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index c5a90769..b7a362bd 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -459,11 +459,10 @@ ORDER BY command_counter; -----------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation 5 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid -(6 rows) +(5 rows) -- Check discard file contents (TRANSDISCARD records go here) SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, @@ -587,7 +586,6 @@ ORDER BY command_counter; -----------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation - 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation 5 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid @@ -595,7 +593,7 @@ ORDER BY command_counter; 7 | | | UPDATE | | Spock can't find relation 8 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid 9 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) -(10 rows) +(9 rows) -- Check discard file contents for UPDATE tests SELECT rec->>'id' AS id, rec->>'node_name' AS node_name, From 51b832fb138f731340e456fad8f4351dce64a51b Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Wed, 18 Mar 2026 16:15:13 +0100 Subject: [PATCH 3/5] Extend DISCARDFILE to cover UPDATE, DELETE, and SQL operations In TRANSDISCARD and SUB_DISABLE modes, only INSERT was being written to the discard file; UPDATE, DELETE, and SQL/DDL operations went through the subtransaction path and were silently discarded. Add the same early-exit discardfile_write() path for all four operation types so that every discarded change is recorded. To support SQL/DDL operations (which have no SpockRelation), teach discardfile_write() to accept rel=NULL: when NULL, the relname field is written as an empty string, tuple serialization is skipped, and two new optional JSON fields (ddl_statement, ddl_user) carry the SQL text and executing role. --- include/spock_exception_handler.h | 3 +- src/spock_apply.c | 158 +++++++++++++++++++----------- src/spock_exception_handler.c | 30 +++++- 3 files changed, 132 insertions(+), 59 deletions(-) diff --git a/include/spock_exception_handler.h b/include/spock_exception_handler.h index 7d940b11..228675d7 100644 --- a/include/spock_exception_handler.h +++ b/include/spock_exception_handler.h @@ -103,6 +103,7 @@ extern bool discardfile_write(const char *node_name, SpockRelation *rel, Oid remote_origin, Oid local_origin, const char *operation, SpockTupleData *oldtup, SpockTupleData *newtup, - TransactionId remote_xid); + TransactionId remote_xid, + const char *ddl_sql, const char *ddl_user); #endif /* SPOCK_EXCEPTION_HANDLER_H */ diff --git a/src/spock_apply.c b/src/spock_apply.c index 389fad7d..b768f576 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -1323,7 +1323,7 @@ handle_insert(StringInfo s) */ discardfile_write(local_node->node->name, rel, remote_origin_id, local_node->node->id, "INSERT", NULL, - &newtup, remote_xid); + &newtup, remote_xid, NULL, NULL); failed = false; } else @@ -1493,27 +1493,44 @@ handle_update(StringInfo s) if (MyApplyWorker->use_try_block == true) { - PG_TRY(); - { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_update(rel, hasoldtup ? &oldtup : &newtup, &newtup); - } - PG_CATCH(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); - if (!failed) + /* + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. + */ + discardfile_write(local_node->node->name, rel, remote_origin_id, + local_node->node->id, "UPDATE", + hasoldtup ? &oldtup : NULL, &newtup, + remote_xid, NULL, NULL); + failed = false; + } + else { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_update(rel, hasoldtup ? &oldtup : &newtup, &newtup); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } @@ -1600,27 +1617,43 @@ handle_delete(StringInfo s) if (MyApplyWorker->use_try_block) { - PG_TRY(); - { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_delete(rel, &oldtup); - } - PG_CATCH(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); - if (!failed) + /* + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. + */ + discardfile_write(local_node->node->name, rel, remote_origin_id, + local_node->node->id, "DELETE", &oldtup, + NULL, remote_xid, NULL, NULL); + failed = false; + } + else { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_delete(rel, &oldtup); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } @@ -2267,31 +2300,46 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started) if (MyApplyWorker->use_try_block) { - PG_TRY(); - { - exception_command_counter++; - BeginInternalSubTransaction(NULL); - handle_sql(queued_message, tx_just_started, &sql); - } - PG_CATCH(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + SpockLocalNode *local_node = get_local_node(false, false); - if (!failed) - { /* - * Follow spock.exception_behavior GUC instead of restarting - * worker + * TRANSDISCARD and SUB_DISABLE needs only registering current + * operation. If an ERROR happens during the logging process - it is + * a FATAL error: apply worker should follow the exception behaviour + * logic related to such kind of problem. + * + * TODO: process returning value and react correspondingly. */ - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + sql = JsonbToCString(NULL, + &queued_message->message->root, 0); + discardfile_write(local_node->node->name, NULL, + remote_origin_id, local_node->node->id, + "SQL", NULL, NULL, remote_xid, + sql, queued_message->role); + failed = false; + } + else + { + /* DISCARD MODE needs hard way - try block and subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + handle_sql(queued_message, tx_just_started, &sql); + } + PG_CATCH(); + { + failed = true; RollbackAndReleaseCurrentSubTransaction(); - else + edata = CopyErrorData(); + xact_had_exception = true; + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } diff --git a/src/spock_exception_handler.c b/src/spock_exception_handler.c index 11b4425a..63c205e6 100644 --- a/src/spock_exception_handler.c +++ b/src/spock_exception_handler.c @@ -95,6 +95,8 @@ #define DF_OLD_TUPLE "old_tuple" #define DF_REMOTE_TUPLE "remote_tuple" #define DF_REMOTE_XID "remote_xid" +#define DF_DDL_SQL "ddl_statement" +#define DF_DDL_USER "ddl_user" SpockExceptionLog *exception_log_ptr = NULL; int exception_behaviour = TRANSDISCARD; @@ -323,6 +325,11 @@ discardfile_ensure_dir(void) * a signed int) followed by exactly that many bytes of JSON payload. Readers * must parse the length header first to determine where each record ends. * + * When rel is NULL (e.g. for DDL/SQL operations that have no target relation), + * the relname JSON field is set to an empty string and tuple serialization is + * skipped. In that case the caller may pass ddl_sql / ddl_user to record the + * SQL statement and the role that executed it. + * * Locking: acquires SpockCtx->discard_file_lock in exclusive mode to * serialize concurrent writes from different apply workers. * @@ -337,14 +344,15 @@ bool discardfile_write(const char *node_name, SpockRelation *rel, Oid remote_origin, Oid local_origin, const char *operation, SpockTupleData *oldtup, SpockTupleData *newtup, - TransactionId remote_xid) + TransactionId remote_xid, + const char *ddl_sql, const char *ddl_user) { char path[MAXPGPATH]; StringInfoData buf; char *old_json = NULL; char *new_json = NULL; int fd; - TupleDesc tupdesc = RelationGetDescr(rel->rel); + TupleDesc tupdesc = rel ? RelationGetDescr(rel->rel) : NULL; Assert(SpockCtx != NULL); @@ -365,7 +373,11 @@ discardfile_write(const char *node_name, SpockRelation *rel, Oid remote_origin, escape_json(&buf, timestamptz_to_str(GetCurrentTimestamp())); appendStringInfoString(&buf, ", \"" DF_RELNAME "\": "); - escape_json(&buf, quote_qualified_identifier(rel->nspname, rel->relname)); + if (rel != NULL) + escape_json(&buf, quote_qualified_identifier(rel->nspname, + rel->relname)); + else + escape_json(&buf, ""); appendStringInfo(&buf, ", \"" DF_LOCAL_ORIGIN "\": %u", local_origin); @@ -380,6 +392,18 @@ discardfile_write(const char *node_name, SpockRelation *rel, Oid remote_origin, if (new_json != NULL) appendStringInfo(&buf, ", \"" DF_REMOTE_TUPLE "\": %s", new_json); + if (ddl_sql != NULL) + { + appendStringInfoString(&buf, ", \"" DF_DDL_SQL "\": "); + escape_json(&buf, ddl_sql); + } + + if (ddl_user != NULL) + { + appendStringInfoString(&buf, ", \"" DF_DDL_USER "\": "); + escape_json(&buf, ddl_user); + } + appendStringInfo(&buf, ", \"" DF_REMOTE_XID "\": %u}", remote_xid); /* Write under lock: [uint32 length][json data] */ From c44653709af838b90e11d71852615d0723000b2a Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Thu, 19 Mar 2026 16:52:59 +0100 Subject: [PATCH 4/5] SUB_DISABLE: allow transaction re-apply after subscription re-enable Previously, in SUB_DISABLE mode, the replication origin LSN was advanced even when a retried transaction succeeded, causing the transaction to be permanently skipped after the subscription was re-enabled. Now the origin LSN is not advanced for SUB_DISABLE retries, so the transaction will be delivered again and applied normally after the user fixes the root cause. Additionally, log full DML details (relation, tuples) to the exception log table for TRANSDISCARD/SUB_DISABLE modes. The LSN of the record that caused the initial error is saved in shared memory, and during the retry the DML handler identifies the failed record by LSN match and calls log_insert_exception with complete tuple data, matching the level of detail previously available only in DISCARD mode. --- include/spock_exception_handler.h | 1 + src/spock_apply.c | 67 +++++++++++++++++++------------ 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/include/spock_exception_handler.h b/include/spock_exception_handler.h index 228675d7..01e85d06 100644 --- a/include/spock_exception_handler.h +++ b/include/spock_exception_handler.h @@ -60,6 +60,7 @@ typedef struct SpockExceptionLog HeapTuple local_tuple; char initial_error_message[1024]; char initial_operation[16]; + XLogRecPtr failed_lsn; } SpockExceptionLog; typedef enum SpockExceptionBehaviour diff --git a/src/spock_apply.c b/src/spock_apply.c index b768f576..a1da8054 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -90,6 +90,7 @@ PGDLLEXPORT void spock_apply_main(Datum main_arg); static bool in_remote_transaction = false; static bool first_begin_at_startup = true; static XLogRecPtr remote_origin_lsn = InvalidXLogRecPtr; +static XLogRecPtr current_record_lsn = InvalidXLogRecPtr; static RepOriginId remote_origin_id = InvalidRepOriginId; static char *remote_origin_name = NULL; static TimeOffset apply_delay = 0; @@ -769,8 +770,15 @@ handle_commit(StringInfo s) * be skipped and made it unavailable when re-enabling the * subscription. Skipping such transactions should be an explicit user * action via spock.sub_alter_skiplsn. + * + * For SUB_DISABLE mode during a retry (use_try_block), do not advance + * the LSN even if the replay succeeded. This allows the transaction + * to be re-applied after the user fixes the root cause and re-enables + * the subscription. */ - if (!xact_had_exception || + if ((!xact_had_exception && + !(MyApplyWorker->use_try_block && + exception_behaviour == SUB_DISABLE)) || exception_behaviour == DISCARD || exception_behaviour == TRANSDISCARD) { @@ -1324,7 +1332,9 @@ handle_insert(StringInfo s) discardfile_write(local_node->node->name, rel, remote_origin_id, local_node->node->id, "INSERT", NULL, &newtup, remote_xid, NULL, NULL); - failed = false; + + /* No DML was attempted, so clear any stale local_tuple pointer. */ + exception_log_ptr[my_exception_log_index].local_tuple = NULL; } else { @@ -1346,19 +1356,17 @@ handle_insert(StringInfo s) if (!failed) ReleaseCurrentSubTransaction(); + } - /* - * Log the exception for DISCARD mode. TRANSDISCARD and - * SUB_DISABLE log a single entry per transaction in - * handle_commit; per-row data goes to the discard file. - */ - { - char *error_msg = edata ? edata->message : - (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? - exception_log_ptr[my_exception_log_index].initial_error_message : NULL); + if (failed || + current_record_lsn == + exception_log_ptr[my_exception_log_index].failed_lsn) + { + char *error_msg = edata ? edata->message : + (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? + exception_log_ptr[my_exception_log_index].initial_error_message : NULL); - log_insert_exception(failed, error_msg, rel, NULL, &newtup, "INSERT"); - } + log_insert_exception(failed, error_msg, rel, NULL, &newtup, "INSERT"); } } else @@ -1510,7 +1518,9 @@ handle_update(StringInfo s) local_node->node->id, "UPDATE", hasoldtup ? &oldtup : NULL, &newtup, remote_xid, NULL, NULL); - failed = false; + + /* No DML was attempted, so clear any stale local_tuple pointer. */ + exception_log_ptr[my_exception_log_index].local_tuple = NULL; } else { @@ -1534,11 +1544,9 @@ handle_update(StringInfo s) ReleaseCurrentSubTransaction(); } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ + if (failed || + current_record_lsn == + exception_log_ptr[my_exception_log_index].failed_lsn) { char *error_msg = edata ? edata->message : (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? @@ -1633,7 +1641,9 @@ handle_delete(StringInfo s) discardfile_write(local_node->node->name, rel, remote_origin_id, local_node->node->id, "DELETE", &oldtup, NULL, remote_xid, NULL, NULL); - failed = false; + + /* No DML was attempted, so clear any stale local_tuple pointer. */ + exception_log_ptr[my_exception_log_index].local_tuple = NULL; } else { @@ -1657,11 +1667,9 @@ handle_delete(StringInfo s) ReleaseCurrentSubTransaction(); } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ + if (failed || + current_record_lsn == + exception_log_ptr[my_exception_log_index].failed_lsn) { char *error_msg = edata ? edata->message : (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? @@ -3071,6 +3079,7 @@ apply_work(PGconn *streamConn) last_inserted = last_received; UpdateWorkerStats(last_received, last_inserted); + current_record_lsn = last_received; replication_handler(msg); /* @@ -3287,6 +3296,14 @@ apply_work(PGconn *streamConn) sizeof(exception_log_ptr[my_exception_log_index].initial_operation), "%s", errcallback_arg.action_name ? errcallback_arg.action_name : "UNKNOWN"); + + /* + * Remember the LSN of the record that triggered the error. + * During the retry, the DML handler will call + * log_insert_exception when this LSN is reached. + */ + exception_log_ptr[my_exception_log_index].failed_lsn = + last_received; } FlushErrorState(); From 2dc8985894667d84fbe282cbaebf29147f8d60fd Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 20 Mar 2026 11:21:50 +0100 Subject: [PATCH 5/5] TRANSDISCARD/SUB_DISABLE: use read-only replay instead of abort/restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the AbortCurrentTransaction + StartTransactionCommand pattern in the TRANSDISCARD/SUB_DISABLE retry path with a read-only transaction. Setting transaction_read_only at the start of the replay prevents any actual DML while still allowing direct catalog writes for exception_log entries via log_insert_exception. This eliminates the need to abort the replayed transaction and start a new one just for logging — the DML-level exception_log entries (with full relation/tuple data) are committed as part of the replay transaction itself. The transdiscard_skip_commit goto is no longer needed. For SUB_DISABLE, the subscription disable logic is consolidated into the post-commit section alongside the existing xact_had_exception handling. --- src/spock_apply.c | 122 ++++++++------------- tests/regress/expected/replication_set.out | 7 +- 2 files changed, 49 insertions(+), 80 deletions(-) diff --git a/src/spock_apply.c b/src/spock_apply.c index a1da8054..33a2ad96 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -436,6 +436,23 @@ begin_replication_step(void) { StartTransactionCommand(); spock_apply_heap_begin(); + + /* + * In TRANSDISCARD/SUB_DISABLE mode, set the transaction + * read-only to prevent any actual DML from being applied. + * Direct catalog writes (exception_log entries) are still + * allowed. + */ + + if (MyApplyWorker->use_try_block && + (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE)) + { + set_config_option("transaction_read_only", "on", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + } + result = true; } @@ -800,48 +817,25 @@ handle_commit(StringInfo s) exception_behaviour == SUB_DISABLE)) { SpockExceptionLog *exception_log; - char errmsg[512]; exception_log = &exception_log_ptr[my_exception_log_index]; /* - * All operations were already rolled back in subtransactions (by - * RollbackAndReleaseCurrentSubTransaction in handle_insert/ - * update/delete). Abort the parent transaction to discard it - * entirely. - */ - AbortCurrentTransaction(); - - /* - * Start a new transaction to log the discard and update progress. + * In TRANSDISCARD/SUB_DISABLE mode, DML operations were never + * attempted — they were skipped and logged to the discardfile. + * The only writes in the current transaction are exception_log + * entries from log_insert_exception (with full relation/tuple + * data). Let the transaction commit normally so those entries + * are preserved. */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* - * Log this transaction as discarded to the exception_log so - * there's an audit trail. Include the original error message if - * we have it. - */ - snprintf(errmsg, sizeof(errmsg), - "%s at LSN %X/%X%s%s", - (exception_behaviour == TRANSDISCARD) - ? "Transaction discarded in TRANSDISCARD mode" - : "Transaction failed, subscription will be disabled", - LSN_FORMAT_ARGS(end_lsn), - exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "", - exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : ""); - - add_entry_to_exception_log(remote_origin_id, - commit_time, - remote_xid, - 0, 0, - NULL, NULL, NULL, NULL, - NULL, NULL, - exception_log->initial_operation, - errmsg); - - elog(LOG, "SPOCK %s: %s", MySubscription->name, errmsg); + elog(LOG, "SPOCK %s: %s at LSN %X/%X%s%s", + MySubscription->name, + (exception_behaviour == TRANSDISCARD) + ? "Transaction discarded in TRANSDISCARD mode" + : "Transaction failed, subscription will be disabled", + LSN_FORMAT_ARGS(end_lsn), + exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "", + exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : ""); /* * Clear the exception state so we don't enter exception handling @@ -850,31 +844,9 @@ handle_commit(StringInfo s) exception_log->commit_lsn = InvalidXLogRecPtr; exception_log->initial_error_message[0] = '\0'; MySpockWorker->restart_delay = 0; - PopActiveSnapshot(); - CommitTransactionCommand(); - /* - * For SUB_DISABLE mode, throw an error to trigger subscription - * disable in the parent PG_CATCH block. The transaction failure - * is already logged above. - */ - if (exception_behaviour == SUB_DISABLE) - { - elog(ERROR, "SPOCK %s: disabling subscription due to exception in SUB_DISABLE mode", - MySubscription->name); - } - - /* - * Switch to MessageContext before continuing. The progress - * tracking code at transdiscard_skip_commit expects - * MessageContext. - */ - MemoryContextSwitchTo(MessageContext); - - /* - * Skip the normal commit path - jump to progress tracking. - */ - goto transdiscard_skip_commit; + /* Defensive check */ + Assert(XactReadOnly); } /* Have the commit code adjust our logical clock if needed */ @@ -889,24 +861,24 @@ handle_commit(StringInfo s) MemoryContextSwitchTo(TopMemoryContext); - if (xact_had_exception) + if (exception_behaviour == SUB_DISABLE && + (xact_had_exception || MyApplyWorker->use_try_block)) { /* - * If we had exception(s) and are in SUB_DISABLE mode then the - * subscription got disabled earlier in the code path. We need to - * exit here to disconnect. + * SUB_DISABLE: after committing exception_log entries, throw + * an ERROR to trigger subscription disable in the PG_CATCH + * block. This covers both the case where DML actually failed + * (xact_had_exception) and the retry path where all DML was + * skipped but the original error was logged (use_try_block). */ - if (exception_behaviour == SUB_DISABLE) - { - SpockExceptionLog *exception_log; + SpockExceptionLog *exception_log; - exception_log = &exception_log_ptr[my_exception_log_index]; - exception_log->commit_lsn = InvalidXLogRecPtr; - MySpockWorker->restart_delay = 0; + exception_log = &exception_log_ptr[my_exception_log_index]; + exception_log->commit_lsn = InvalidXLogRecPtr; + MySpockWorker->restart_delay = 0; - elog(ERROR, "SPOCK %s: exiting because subscription disabled", - MySubscription->name); - } + elog(ERROR, "SPOCK %s: disabling subscription due to exception in SUB_DISABLE mode", + MySubscription->name); } else if (MyApplyWorker->use_try_block && exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') @@ -953,7 +925,6 @@ handle_commit(StringInfo s) */ maybe_advance_forwarded_origin(end_lsn, xact_had_exception); -transdiscard_skip_commit: /* Update the entry in the progress table. */ elog(DEBUG1, "SPOCK %s: updating progress table for node_id %d" \ " and remote node id %d with remote commit ts" \ @@ -3332,7 +3303,6 @@ apply_work(PGconn *streamConn) if (need_replay) { MyApplyWorker->use_try_block = true; - goto stream_replay; } diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index b7a362bd..f77c719b 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -459,9 +459,9 @@ ORDER BY command_counter; -----------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation - 5 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid (5 rows) -- Check discard file contents (TRANSDISCARD records go here) @@ -586,14 +586,13 @@ ORDER BY command_counter; -----------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- 1 | | | INSERT | | Spock can't find relation 2 | | | INSERT | | Spock can't find relation + 2 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid 3 | | | INSERT | | Spock can't find relation 4 | | | INSERT | | Spock can't find relation - 5 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid 6 | | | UPDATE | | Spock can't find relation 7 | | | UPDATE | | Spock can't find relation - 8 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid 9 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) -(9 rows) +(8 rows) -- Check discard file contents for UPDATE tests SELECT rec->>'id' AS id, rec->>'node_name' AS node_name,