diff --git a/Makefile b/Makefile index 772daa4b..96fe47be 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,8 @@ REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondar interfaces foreign_key copy sequence triggers parallel functions row_filter \ row_filter_sampling att_list column_filter apply_delay \ extended node_origin_cascade multiple_upstreams tuple_origin autoddl \ - sync_event sync_table generated_columns spill_transaction read_only drop + sync_event sync_table generated_columns spill_transaction read_only \ + resolutions_retention drop # The following test cases are disabled while developing. # diff --git a/docs/configuring.md b/docs/configuring.md index de7240a3..5d94c5f1 100644 --- a/docs/configuring.md +++ b/docs/configuring.md @@ -258,6 +258,22 @@ The following configuration values are possible: logs all conflict resolutions to the `spock.resolutions` table. This option can only be set when the postmaster starts. +### `spock.resolutions_retention_days` + +`spock.resolutions_retention_days` controls how long rows are kept in the +`spock.resolutions` table. Rows with a `log_time` older than this many days +are deleted automatically by the apply worker, which runs the cleanup at most +once per day. The default is `100` days. Set to `0` to disable automatic +cleanup entirely. + +This GUC has no effect when `spock.save_resolutions` is `off`. + +Cleanup can also be triggered manually at any time by a superuser: + +```sql +SELECT spock.cleanup_resolutions(); +``` + ### `spock.stats_max_entries` `spock.stats_max_entries` specifies the maximum number of entries that can diff --git a/docs/spock_functions/functions/spock_cleanup_resolutions.md b/docs/spock_functions/functions/spock_cleanup_resolutions.md new file mode 100644 index 00000000..9e44560c --- /dev/null +++ b/docs/spock_functions/functions/spock_cleanup_resolutions.md @@ -0,0 +1,41 @@ +## NAME + +spock.cleanup_resolutions() + +### SYNOPSIS + +spock.cleanup_resolutions() + +### RETURNS + +bigint — the number of rows deleted from `spock.resolutions`. + +### DESCRIPTION + +Deletes rows from `spock.resolutions` whose `log_time` is older than the +value configured by `spock.resolutions_retention_days`. Returns the number +of rows deleted. + +This function is a superuser-only manual trigger for the same cleanup that +the apply worker runs automatically once per day. It is useful for +immediate cleanup via `pg_cron` or when the apply worker has not been +running. + +The function respects both `spock.save_resolutions` and +`spock.resolutions_retention_days`. If either setting disables cleanup +(`save_resolutions = off` or `resolutions_retention_days = 0`), the +function returns `0` without deleting anything. + +### ARGUMENTS + +None. + +### EXAMPLE + +Delete conflict history rows older than the configured retention window: + + SELECT spock.cleanup_resolutions(); + +### SEE ALSO + +`spock.save_resolutions`, `spock.resolutions_retention_days` diff --git a/include/spock_conflict.h b/include/spock_conflict.h index ba196579..a6fe9b6c 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -42,6 +42,7 @@ typedef enum extern int spock_conflict_resolver; extern int spock_conflict_log_level; extern bool spock_save_resolutions; +extern int spock_resolutions_retention_days; /* * We want to eventually match native PostgreSQL conflict types, @@ -161,5 +162,6 @@ extern bool spock_conflict_resolver_check_hook(int *newval, void **extra, extern void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple); +extern uint64 spock_cleanup_resolutions(void); #endif /* SPOCK_CONFLICT_H */ diff --git a/sql/spock--5.0.6--6.0.0-devel.sql b/sql/spock--5.0.6--6.0.0-devel.sql index 23492975..2ec45d2d 100644 --- a/sql/spock--5.0.6--6.0.0-devel.sql +++ b/sql/spock--5.0.6--6.0.0-devel.sql @@ -177,6 +177,15 @@ SET conflict_type = CASE conflict_type ELSE conflict_type END; +-- Add index on log_time to support efficient TTL-based cleanup +CREATE INDEX ON spock.resolutions (log_time); + +-- Manual cleanup function for the resolutions table +CREATE FUNCTION spock.cleanup_resolutions() +RETURNS bigint VOLATILE +LANGUAGE c AS 'MODULE_PATHNAME', 'spock_cleanup_resolutions_sql'; +REVOKE ALL ON FUNCTION spock.cleanup_resolutions() FROM PUBLIC; + -- ---- -- Subscription conflict statistics -- ---- diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 882082e2..89d8ec9b 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -351,6 +351,12 @@ CREATE TABLE spock.resolutions ( PRIMARY KEY(id, node_name) ) WITH (user_catalog_table=true); +CREATE INDEX ON spock.resolutions (log_time); + +CREATE FUNCTION spock.cleanup_resolutions() +RETURNS bigint VOLATILE +LANGUAGE c AS 'MODULE_PATHNAME', 'spock_cleanup_resolutions_sql'; +REVOKE ALL ON FUNCTION spock.cleanup_resolutions() FROM PUBLIC; CREATE VIEW spock.TABLES AS WITH set_relations AS ( diff --git a/src/spock.c b/src/spock.c index 1683e303..582525cf 100644 --- a/src/spock.c +++ b/src/spock.c @@ -1012,6 +1012,17 @@ _PG_init(void) 0, NULL, NULL, NULL); + DefineCustomIntVariable("spock.resolutions_retention_days", + "Number of days to retain rows in spock." CATALOG_LOGTABLE " table. " + "Rows older than this are deleted periodically by the apply worker. " + "Set to 0 to disable automatic cleanup.", + NULL, + &spock_resolutions_retention_days, + 100, 0, INT_MAX, + PGC_SUSET, + 0, + NULL, NULL, NULL); + DefineCustomBoolVariable("spock.enable_quiet_mode", "Reduce message verbosity for cleaner output", "When enabled, downgrades DDL replication INFO/WARNING messages to LOG level " diff --git a/src/spock_apply.c b/src/spock_apply.c index f78cf19d..cabdefcd 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -218,6 +218,9 @@ static dlist_head sync_replica_lsn = DLIST_STATIC_INIT(sync_replica_lsn); static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) +/* How often the apply worker runs spock_cleanup_resolutions() (milliseconds). */ +#define RESOLUTIONS_CLEANUP_INTERVAL_MS (86400L * 1000L) + /* * Whereas MessageContext is used for the duration of a transaction, * ApplyOperationContext can be used for individual operations @@ -2947,6 +2950,7 @@ apply_work(PGconn *streamConn) XLogRecPtr last_received = InvalidXLogRecPtr; XLogRecPtr last_inserted = InvalidXLogRecPtr; TimestampTz last_receive_timestamp = GetCurrentTimestamp(); + TimestampTz last_cleanup_timestamp = 0; bool need_replay; ErrorData *edata = NULL; @@ -3050,6 +3054,26 @@ apply_work(PGconn *streamConn) } } + /* + * Periodically clean up old rows from spock.resolutions. We run + * at most once per day regardless of whether the worker is idle + * or processing traffic. spock_cleanup_resolutions() manages its + * own transaction and error handling. + */ + if (!IsTransactionState() && + spock_resolutions_retention_days > 0) + { + TimestampTz cleanup_due; + + cleanup_due = TimestampTzPlusMilliseconds(last_cleanup_timestamp, + RESOLUTIONS_CLEANUP_INTERVAL_MS); + if (GetCurrentTimestamp() >= cleanup_due) + { + spock_cleanup_resolutions(); + last_cleanup_timestamp = GetCurrentTimestamp(); + } + } + Assert(CurrentMemoryContext == MessageContext); for (;;) diff --git a/src/spock_conflict.c b/src/spock_conflict.c index 93989696..b7695991 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -87,6 +87,7 @@ SpockConflictTypeName(SpockConflictType t) int spock_conflict_resolver = SPOCK_RESOLVE_LAST_UPDATE_WINS; int spock_conflict_log_level = LOG; bool spock_save_resolutions = false; +int spock_resolutions_retention_days = 100; static Datum spock_conflict_row_to_json(Datum row, bool row_isnull, bool *ret_isnull); @@ -882,6 +883,149 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple) } } +/* + * Delete rows from spock.resolutions that are older than + * spock.resolutions_retention_days. Returns the number of rows deleted. + * + * Caller must have an active transaction and snapshot (SPI requirement). + * Errors propagate to the caller; no error suppression here. + */ +static uint64 +spock_cleanup_resolutions_core(void) +{ + int ret; + uint64 ndeleted; + StringInfoData cmd; + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "DELETE FROM spock.%s WHERE log_time < now() - '%d days'::interval", + CATALOG_LOGTABLE, spock_resolutions_retention_days); + + if (SPI_connect() != SPI_OK_CONNECT) + { + pfree(cmd.data); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("SPOCK: SPI_connect failed in spock_cleanup_resolutions"))); + } + + ret = SPI_execute(cmd.data, false, 0); + pfree(cmd.data); + + if (ret != SPI_OK_DELETE) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("SPOCK: unexpected SPI result %d in spock_cleanup_resolutions", + ret))); + + ndeleted = SPI_processed; + SPI_finish(); + + elog(DEBUG1, "SPOCK: cleaned up " UINT64_FORMAT " row(s) from spock.%s", + ndeleted, CATALOG_LOGTABLE); + + return ndeleted; +} + +/* + * spock_cleanup_resolutions + * + * Apply worker entry point. Manages its own transaction so it can be called + * from the background loop where no transaction is active. Errors are + * downgraded to WARNING so a transient failure does not disrupt replication; + * the worker will retry on the next daily cycle. + */ +uint64 +spock_cleanup_resolutions(void) +{ + uint64 ndeleted = 0; + MemoryContext oldcontext; + + if (spock_resolutions_retention_days <= 0) + return 0; + + /* + * Save the caller's memory context (MessageContext in the apply worker) + * before entering PG_TRY. + */ + oldcontext = CurrentMemoryContext; + + /* + * The entire transaction lifetime lives inside PG_TRY so that errors + * from StartTransactionCommand() or PushActiveSnapshot() — not just SPI + * execution failures — are also caught and downgraded to WARNING. + * + * SetCurrentStatementStartTimestamp() must precede StartTransactionCommand() + * so the transaction's cached current_timestamp is initialised correctly. + * PushActiveSnapshot() is required by SPI_execute (it asserts an active + * snapshot exists). + */ + PG_TRY(); + { + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* do the cleanup */ + ndeleted = spock_cleanup_resolutions_core(); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + PG_CATCH(); + { + ErrorData *edata; + + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + /* + * Abort only if a transaction was actually started. If the error + * occurred in SetCurrentStatementStartTimestamp() or before + * StartTransactionCommand() completed, there may be no transaction + * to abort. AbortCurrentTransaction() also handles SPI and snapshot + * cleanup via AtEOXact_SPI() and AtAbort_Snapshot(), avoiding + * double-cleanup if core() already called SPI_finish() before + * CommitTransactionCommand() threw. + */ + if (IsTransactionState()) + AbortCurrentTransaction(); + + ereport(WARNING, + (errcode(edata->sqlerrcode), + errmsg("%s", edata->message))); + FreeErrorData(edata); + } + PG_END_TRY(); + MemoryContextSwitchTo(oldcontext); + + return ndeleted; +} + +/* + * spock_cleanup_resolutions_sql + * + * SQL-callable entry point. The executor already provides an active + * transaction, so we call the core function directly. Any error propagates + * to the caller normally — no silent transaction poisoning. + */ +PG_FUNCTION_INFO_V1(spock_cleanup_resolutions_sql); +Datum +spock_cleanup_resolutions_sql(PG_FUNCTION_ARGS) +{ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to call spock.cleanup_resolutions()"))); + + if (spock_resolutions_retention_days <= 0) + PG_RETURN_INT64(0); + + PG_RETURN_INT64((int64) spock_cleanup_resolutions_core()); +} + /* * Convert the target row to json form if it isn't null. */ diff --git a/tests/regress/expected/resolutions_retention.out b/tests/regress/expected/resolutions_retention.out new file mode 100644 index 00000000..cbf0af16 --- /dev/null +++ b/tests/regress/expected/resolutions_retention.out @@ -0,0 +1,199 @@ +-- resolutions_retention: test spock.resolutions_retention_days GUC and +-- spock.cleanup_resolutions() SQL function. +SELECT * FROM spock_regress_variables() +\gset +-- Configure GUCs up front on both nodes +\c :provider_dsn +ALTER SYSTEM SET spock.save_resolutions = on; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +\c :subscriber_dsn +ALTER SYSTEM SET spock.save_resolutions = on; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(1); + pg_sleep +---------- + +(1 row) + +TRUNCATE spock.resolutions; +-- Setup: create a table and seed it on both sides to enable conflict generation +\c :provider_dsn +SELECT spock.replicate_ddl($$ + CREATE TABLE retention_test (id int PRIMARY KEY, data text); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'retention_test'); + repset_add_table +------------------ + t +(1 row) + +INSERT INTO retention_test VALUES (1, 'one'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +-- Generate an insert_exists conflict: insert same PK on subscriber first, +-- then provider insert arrives and conflicts. +\c :subscriber_dsn +INSERT INTO retention_test VALUES (2, 'sub-two'); +\c :provider_dsn +INSERT INTO retention_test VALUES (2, 'pub-two'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Expect 1 conflict row (insert_exists) +SELECT conflict_type FROM spock.resolutions WHERE relname = 'public.retention_test'; + conflict_type +--------------- + insert_exists +(1 row) + +-- Backdate that row to 60 days ago to simulate aged history +UPDATE spock.resolutions +SET log_time = now() - '60 days'::interval +WHERE relname = 'public.retention_test'; +-- Expect 1 row total (the 60-day-old one) +SELECT COUNT(*) AS total FROM spock.resolutions WHERE relname = 'public.retention_test'; + total +------- + 1 +(1 row) + +-- Set retention to 30 days: the 60-day-old row falls outside the window +-- (60 > 30) so cleanup will delete it. +SET spock.resolutions_retention_days = 30; +SELECT spock.cleanup_resolutions() AS rows_deleted; + rows_deleted +-------------- + 1 +(1 row) + +-- Expect 0 rows remaining +SELECT COUNT(*) AS remaining FROM spock.resolutions WHERE relname = 'public.retention_test'; + remaining +----------- + 0 +(1 row) + +-- Generate a fresh conflict for subsequent tests +INSERT INTO retention_test VALUES (3, 'sub-three'); +\c :provider_dsn +INSERT INTO retention_test VALUES (3, 'pub-three'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Expect 1 recent row +SELECT COUNT(*) AS total FROM spock.resolutions WHERE relname = 'public.retention_test'; + total +------- + 1 +(1 row) + +-- Test that retention_days = 0 disables cleanup: backdate the row so it +-- would be deleted if cleanup ran, then verify the guard prevents deletion. +UPDATE spock.resolutions +SET log_time = now() - '999 days'::interval +WHERE relname = 'public.retention_test'; +SET spock.resolutions_retention_days = 0; +SELECT spock.cleanup_resolutions() AS rows_deleted; + rows_deleted +-------------- + 0 +(1 row) + +-- Row should still be there +SELECT COUNT(*) AS remaining FROM spock.resolutions WHERE relname = 'public.retention_test'; + remaining +----------- + 1 +(1 row) + +-- Test that cleanup runs even when save_resolutions=off: logging controls new +-- inserts only; cleanup is driven solely by retention_days. +UPDATE spock.resolutions +SET log_time = now() - '999 days'::interval +WHERE relname = 'public.retention_test'; +SET spock.resolutions_retention_days = 30; +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(1); + pg_sleep +---------- + +(1 row) + +SELECT spock.cleanup_resolutions() AS rows_deleted; + rows_deleted +-------------- + 1 +(1 row) + +-- Row should be deleted (save_resolutions=off does not suppress cleanup) +SELECT COUNT(*) AS remaining FROM spock.resolutions WHERE relname = 'public.retention_test'; + remaining +----------- + 0 +(1 row) + +-- Cleanup +\c :provider_dsn +SELECT * FROM spock.repset_remove_table('default', 'retention_test'); + repset_remove_table +--------------------- + t +(1 row) + +SELECT spock.replicate_ddl($$ + DROP TABLE retention_test CASCADE; +$$); + replicate_ddl +--------------- + t +(1 row) + +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +\c :subscriber_dsn +RESET spock.resolutions_retention_days; +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + diff --git a/tests/regress/sql/resolutions_retention.sql b/tests/regress/sql/resolutions_retention.sql new file mode 100644 index 00000000..ce3fed96 --- /dev/null +++ b/tests/regress/sql/resolutions_retention.sql @@ -0,0 +1,106 @@ +-- resolutions_retention: test spock.resolutions_retention_days GUC and +-- spock.cleanup_resolutions() SQL function. +SELECT * FROM spock_regress_variables() +\gset + +-- Configure GUCs up front on both nodes +\c :provider_dsn +ALTER SYSTEM SET spock.save_resolutions = on; +SELECT pg_reload_conf(); + +\c :subscriber_dsn +ALTER SYSTEM SET spock.save_resolutions = on; +SELECT pg_reload_conf(); +SELECT pg_sleep(1); + +TRUNCATE spock.resolutions; + +-- Setup: create a table and seed it on both sides to enable conflict generation +\c :provider_dsn +SELECT spock.replicate_ddl($$ + CREATE TABLE retention_test (id int PRIMARY KEY, data text); +$$); +SELECT * FROM spock.repset_add_table('default', 'retention_test'); +INSERT INTO retention_test VALUES (1, 'one'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +-- Generate an insert_exists conflict: insert same PK on subscriber first, +-- then provider insert arrives and conflicts. +\c :subscriber_dsn +INSERT INTO retention_test VALUES (2, 'sub-two'); + +\c :provider_dsn +INSERT INTO retention_test VALUES (2, 'pub-two'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn +-- Expect 1 conflict row (insert_exists) +SELECT conflict_type FROM spock.resolutions WHERE relname = 'public.retention_test'; + +-- Backdate that row to 60 days ago to simulate aged history +UPDATE spock.resolutions +SET log_time = now() - '60 days'::interval +WHERE relname = 'public.retention_test'; + +-- Expect 1 row total (the 60-day-old one) +SELECT COUNT(*) AS total FROM spock.resolutions WHERE relname = 'public.retention_test'; + +-- Set retention to 30 days: the 60-day-old row falls outside the window +-- (60 > 30) so cleanup will delete it. +SET spock.resolutions_retention_days = 30; +SELECT spock.cleanup_resolutions() AS rows_deleted; + +-- Expect 0 rows remaining +SELECT COUNT(*) AS remaining FROM spock.resolutions WHERE relname = 'public.retention_test'; + +-- Generate a fresh conflict for subsequent tests +INSERT INTO retention_test VALUES (3, 'sub-three'); + +\c :provider_dsn +INSERT INTO retention_test VALUES (3, 'pub-three'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn +-- Expect 1 recent row +SELECT COUNT(*) AS total FROM spock.resolutions WHERE relname = 'public.retention_test'; + +-- Test that retention_days = 0 disables cleanup: backdate the row so it +-- would be deleted if cleanup ran, then verify the guard prevents deletion. +UPDATE spock.resolutions +SET log_time = now() - '999 days'::interval +WHERE relname = 'public.retention_test'; +SET spock.resolutions_retention_days = 0; +SELECT spock.cleanup_resolutions() AS rows_deleted; + +-- Row should still be there +SELECT COUNT(*) AS remaining FROM spock.resolutions WHERE relname = 'public.retention_test'; + +-- Test that cleanup runs even when save_resolutions=off: logging controls new +-- inserts only; cleanup is driven solely by retention_days. +UPDATE spock.resolutions +SET log_time = now() - '999 days'::interval +WHERE relname = 'public.retention_test'; + +SET spock.resolutions_retention_days = 30; +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); +SELECT pg_sleep(1); + +SELECT spock.cleanup_resolutions() AS rows_deleted; + +-- Row should be deleted (save_resolutions=off does not suppress cleanup) +SELECT COUNT(*) AS remaining FROM spock.resolutions WHERE relname = 'public.retention_test'; + +-- Cleanup +\c :provider_dsn +SELECT * FROM spock.repset_remove_table('default', 'retention_test'); +SELECT spock.replicate_ddl($$ + DROP TABLE retention_test CASCADE; +$$); +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); + +\c :subscriber_dsn +RESET spock.resolutions_retention_days; +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf();