Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondar
interfaces foreign_key copy sequence triggers parallel functions row_filter \
row_filter_sampling att_list column_filter apply_delay \
extended node_origin_cascade multiple_upstreams tuple_origin autoddl \
sync_event sync_table generated_columns spill_transaction read_only drop
sync_event sync_table generated_columns spill_transaction read_only \
resolutions_retention drop

# The following test cases are disabled while developing.
#
Expand Down
16 changes: 16 additions & 0 deletions docs/configuring.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,22 @@ The following configuration values are possible:
logs all conflict resolutions to the `spock.resolutions` table. This option
can only be set when the postmaster starts.

### `spock.resolutions_retention_days`

`spock.resolutions_retention_days` controls how long rows are kept in the
`spock.resolutions` table. Rows with a `log_time` older than this many days
are deleted automatically by the apply worker, which runs the cleanup at most
once per day. The default is `100` days. Set to `0` to disable automatic
cleanup entirely.

This GUC has no effect when `spock.save_resolutions` is `off`.

Cleanup can also be triggered manually at any time by a superuser:

```sql
SELECT spock.cleanup_resolutions();
```

### `spock.stats_max_entries`

`spock.stats_max_entries` specifies the maximum number of entries that can
Expand Down
41 changes: 41 additions & 0 deletions docs/spock_functions/functions/spock_cleanup_resolutions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
## NAME

spock.cleanup_resolutions()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, since you created this, what might be nice is if takes an optional argument that overrides resolutions_retention_days. For example, maybe they set retention to 0, then periodically manually run something like SELECT spock.cleanup_resolutions(60) when they think it is getting too big.


### SYNOPSIS

spock.cleanup_resolutions()

### RETURNS

bigint — the number of rows deleted from `spock.resolutions`.

### DESCRIPTION

Deletes rows from `spock.resolutions` whose `log_time` is older than the
value configured by `spock.resolutions_retention_days`. Returns the number
of rows deleted.

This function is a superuser-only manual trigger for the same cleanup that
the apply worker runs automatically once per day. It is useful for
immediate cleanup via `pg_cron` or when the apply worker has not been
running.

The function respects both `spock.save_resolutions` and
`spock.resolutions_retention_days`. If either setting disables cleanup
(`save_resolutions = off` or `resolutions_retention_days = 0`), the
function returns `0` without deleting anything.

### ARGUMENTS

None.

### EXAMPLE

Delete conflict history rows older than the configured retention window:

SELECT spock.cleanup_resolutions();

### SEE ALSO

`spock.save_resolutions`, `spock.resolutions_retention_days`
2 changes: 2 additions & 0 deletions include/spock_conflict.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ typedef enum
extern int spock_conflict_resolver;
extern int spock_conflict_log_level;
extern bool spock_save_resolutions;
extern int spock_resolutions_retention_days;

/*
* We want to eventually match native PostgreSQL conflict types,
Expand Down Expand Up @@ -161,5 +162,6 @@ extern bool spock_conflict_resolver_check_hook(int *newval, void **extra,

extern void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc,
HeapTuple tuple);
extern uint64 spock_cleanup_resolutions(void);

#endif /* SPOCK_CONFLICT_H */
9 changes: 9 additions & 0 deletions sql/spock--5.0.6--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ SET conflict_type = CASE conflict_type
ELSE conflict_type
END;

-- Add index on log_time to support efficient TTL-based cleanup
CREATE INDEX ON spock.resolutions (log_time);

-- Manual cleanup function for the resolutions table
CREATE FUNCTION spock.cleanup_resolutions()
RETURNS bigint VOLATILE
LANGUAGE c AS 'MODULE_PATHNAME', 'spock_cleanup_resolutions_sql';
REVOKE ALL ON FUNCTION spock.cleanup_resolutions() FROM PUBLIC;

-- ----
-- Subscription conflict statistics
-- ----
Expand Down
6 changes: 6 additions & 0 deletions sql/spock--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ CREATE TABLE spock.resolutions (

PRIMARY KEY(id, node_name)
) WITH (user_catalog_table=true);
CREATE INDEX ON spock.resolutions (log_time);

CREATE FUNCTION spock.cleanup_resolutions()
RETURNS bigint VOLATILE
LANGUAGE c AS 'MODULE_PATHNAME', 'spock_cleanup_resolutions_sql';
REVOKE ALL ON FUNCTION spock.cleanup_resolutions() FROM PUBLIC;

CREATE VIEW spock.TABLES AS
WITH set_relations AS (
Expand Down
11 changes: 11 additions & 0 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,17 @@ _PG_init(void)
0,
NULL, NULL, NULL);

DefineCustomIntVariable("spock.resolutions_retention_days",
"Number of days to retain rows in spock." CATALOG_LOGTABLE " table. "
"Rows older than this are deleted periodically by the apply worker. "
"Set to 0 to disable automatic cleanup.",
NULL,
&spock_resolutions_retention_days,
100, 0, INT_MAX,
PGC_SUSET,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use PGC_SIGHUP since it is used in the apply worker?

0,
NULL, NULL, NULL);

DefineCustomBoolVariable("spock.enable_quiet_mode",
"Reduce message verbosity for cleaner output",
"When enabled, downgrades DDL replication INFO/WARNING messages to LOG level "
Expand Down
24 changes: 24 additions & 0 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ static dlist_head sync_replica_lsn = DLIST_STATIC_INIT(sync_replica_lsn);
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

/* How often the apply worker runs spock_cleanup_resolutions() (milliseconds). */
#define RESOLUTIONS_CLEANUP_INTERVAL_MS (86400L * 1000L)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could leave this, but ms resolution seems like overkill when seconds would do?


/*
* Whereas MessageContext is used for the duration of a transaction,
* ApplyOperationContext can be used for individual operations
Expand Down Expand Up @@ -2947,6 +2950,7 @@ apply_work(PGconn *streamConn)
XLogRecPtr last_received = InvalidXLogRecPtr;
XLogRecPtr last_inserted = InvalidXLogRecPtr;
TimestampTz last_receive_timestamp = GetCurrentTimestamp();
TimestampTz last_cleanup_timestamp = 0;
bool need_replay;
ErrorData *edata = NULL;

Expand Down Expand Up @@ -3050,6 +3054,26 @@ apply_work(PGconn *streamConn)
}
}

/*
* Periodically clean up old rows from spock.resolutions. We run
* at most once per day regardless of whether the worker is idle
* or processing traffic. spock_cleanup_resolutions() manages its
* own transaction and error handling.
*/
if (!IsTransactionState() &&
spock_resolutions_retention_days > 0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs mention it checks save_resolutions, too.

{
TimestampTz cleanup_due;

cleanup_due = TimestampTzPlusMilliseconds(last_cleanup_timestamp,
RESOLUTIONS_CLEANUP_INTERVAL_MS);
if (GetCurrentTimestamp() >= cleanup_due)
{
spock_cleanup_resolutions();
last_cleanup_timestamp = GetCurrentTimestamp();
}
}

Assert(CurrentMemoryContext == MessageContext);

for (;;)
Expand Down
144 changes: 144 additions & 0 deletions src/spock_conflict.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ SpockConflictTypeName(SpockConflictType t)
int spock_conflict_resolver = SPOCK_RESOLVE_LAST_UPDATE_WINS;
int spock_conflict_log_level = LOG;
bool spock_save_resolutions = false;
int spock_resolutions_retention_days = 100;

static Datum spock_conflict_row_to_json(Datum row, bool row_isnull,
bool *ret_isnull);
Expand Down Expand Up @@ -882,6 +883,149 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple)
}
}

/*
* Delete rows from spock.resolutions that are older than
* spock.resolutions_retention_days. Returns the number of rows deleted.
*
* Caller must have an active transaction and snapshot (SPI requirement).
* Errors propagate to the caller; no error suppression here.
*/
static uint64
spock_cleanup_resolutions_core(void)
{
int ret;
uint64 ndeleted;
StringInfoData cmd;

initStringInfo(&cmd);
appendStringInfo(&cmd,
"DELETE FROM spock.%s WHERE log_time < now() - '%d days'::interval",
CATALOG_LOGTABLE, spock_resolutions_retention_days);

if (SPI_connect() != SPI_OK_CONNECT)
{
pfree(cmd.data);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("SPOCK: SPI_connect failed in spock_cleanup_resolutions")));
}

ret = SPI_execute(cmd.data, false, 0);
pfree(cmd.data);

if (ret != SPI_OK_DELETE)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("SPOCK: unexpected SPI result %d in spock_cleanup_resolutions",
ret)));

ndeleted = SPI_processed;
SPI_finish();

elog(DEBUG1, "SPOCK: cleaned up " UINT64_FORMAT " row(s) from spock.%s",
ndeleted, CATALOG_LOGTABLE);

return ndeleted;
}

/*
* spock_cleanup_resolutions
*
* Apply worker entry point. Manages its own transaction so it can be called
* from the background loop where no transaction is active. Errors are
* downgraded to WARNING so a transient failure does not disrupt replication;
* the worker will retry on the next daily cycle.
*/
uint64
spock_cleanup_resolutions(void)
{
uint64 ndeleted = 0;
MemoryContext oldcontext;

if (spock_resolutions_retention_days <= 0)
return 0;

/*
* Save the caller's memory context (MessageContext in the apply worker)
* before entering PG_TRY.
*/
oldcontext = CurrentMemoryContext;

/*
* The entire transaction lifetime lives inside PG_TRY so that errors
* from StartTransactionCommand() or PushActiveSnapshot() — not just SPI
* execution failures — are also caught and downgraded to WARNING.
*
* SetCurrentStatementStartTimestamp() must precede StartTransactionCommand()
* so the transaction's cached current_timestamp is initialised correctly.
* PushActiveSnapshot() is required by SPI_execute (it asserts an active
* snapshot exists).
*/
PG_TRY();
{
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());

/* do the cleanup */
ndeleted = spock_cleanup_resolutions_core();

PopActiveSnapshot();
CommitTransactionCommand();
}
PG_CATCH();
{
ErrorData *edata;

MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
FlushErrorState();

/*
* Abort only if a transaction was actually started. If the error
* occurred in SetCurrentStatementStartTimestamp() or before
* StartTransactionCommand() completed, there may be no transaction
* to abort. AbortCurrentTransaction() also handles SPI and snapshot
* cleanup via AtEOXact_SPI() and AtAbort_Snapshot(), avoiding
* double-cleanup if core() already called SPI_finish() before
* CommitTransactionCommand() threw.
*/
if (IsTransactionState())
AbortCurrentTransaction();

ereport(WARNING,
(errcode(edata->sqlerrcode),
errmsg("%s", edata->message)));
FreeErrorData(edata);
}
PG_END_TRY();
MemoryContextSwitchTo(oldcontext);

return ndeleted;
}

/*
* spock_cleanup_resolutions_sql
*
* SQL-callable entry point. The executor already provides an active
* transaction, so we call the core function directly. Any error propagates
* to the caller normally — no silent transaction poisoning.
*/
PG_FUNCTION_INFO_V1(spock_cleanup_resolutions_sql);
Datum
spock_cleanup_resolutions_sql(PG_FUNCTION_ARGS)
{
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to call spock.cleanup_resolutions()")));

if (spock_resolutions_retention_days <= 0)
PG_RETURN_INT64(0);

PG_RETURN_INT64((int64) spock_cleanup_resolutions_core());
}

/*
* Convert the target row to json form if it isn't null.
*/
Expand Down
Loading
Loading