Skip to content

Commit 1b0262a

Browse files
author
Ibrar Ahmed
committed
Replace wal_sender_timeout-based liveness with TCP keepalive.
The apply worker previously relied on wal_sender_timeout as both a server-side disconnect trigger and an indirect keepalive pressure on the subscriber. This caused spurious disconnects in two scenarios: a flood of 'w' messages keeping the subscriber too busy to send 'r' feedback in time, and large transactions whose apply time exceeded wal_sender_timeout. The workaround was maybe_send_feedback(), which force-sent 'r' after every 10 'w' messages or wal_sender_timeout/2, whichever came first. This was a fragile band-aid that coupled subscriber behavior to a server GUC it cannot control. Replace the entire mechanism with a clean two-layer model: - TCP keepalive (keepalives_idle=10, keepalives_interval=5, keepalives_count=3) is the primary liveness detector on both sides. A dead network or crashed host is detected in ~25 seconds. - wal_sender_timeout=0 is set on replication connections so the walsender never disconnects due to missing 'r' feedback. Liveness on the server side is now handled entirely by TCP keepalive. - spock.apply_idle_timeout (default 300s) is a subscriber-side safety net for a hung-but-connected walsender whose TCP keepalive probes are answered by the kernel but sends no data. Set to 0 to disable. Fix a bug in last_receive_timestamp handling: it was updated unconditionally after every PQgetCopyData call, including when r==0 (no data available). Each 1-second WL_TIMEOUT spin silently reset the timer, making apply_idle_timeout never fire. Move the update to after the r==0 guard so it reflects actual data receipt only. Remove maybe_send_feedback() as it is no longer needed.
1 parent 33a21c9 commit 1b0262a

File tree

3 files changed

+55
-62
lines changed

3 files changed

+55
-62
lines changed

include/spock.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ extern int restart_delay_on_exception;
5353
extern int spock_replay_queue_size; /* Deprecated - no longer used */
5454
extern bool check_all_uc_indexes;
5555
extern bool spock_enable_quiet_mode;
56+
extern int spock_apply_idle_timeout;
5657

5758
extern char *shorten_hash(const char *str, int maxlen);
5859
extern void gen_slot_name(Name slot_name, char *dbname,

src/spock.c

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ int restart_delay_on_exception;
135135
int spock_replay_queue_size; /* Deprecated - no longer used */
136136
bool check_all_uc_indexes = false;
137137
bool spock_enable_quiet_mode = false;
138+
int spock_apply_idle_timeout = 300;
138139

139140
static emit_log_hook_type prev_emit_log_hook = NULL;
140141
static Checkpoint_hook_type prev_Checkpoint_hook = NULL;
@@ -304,7 +305,7 @@ get_spock_table_oid(const char *table)
304305
return reloid;
305306
}
306307

307-
#define CONN_PARAM_ARRAY_SIZE 9
308+
#define CONN_PARAM_ARRAY_SIZE 10
308309

309310
static PGconn *
310311
spock_connect_base(const char *connstr, const char *appname,
@@ -345,17 +346,32 @@ spock_connect_base(const char *connstr, const char *appname,
345346
vals[i] = "1";
346347
i++;
347348
keys[i] = "keepalives_idle";
348-
vals[i] = "20";
349+
vals[i] = "10";
349350
i++;
350351
keys[i] = "keepalives_interval";
351-
vals[i] = "20";
352+
vals[i] = "5";
352353
i++;
353354
keys[i] = "keepalives_count";
354-
vals[i] = "5";
355+
vals[i] = "3";
355356
i++;
356357
keys[i] = "replication";
357358
vals[i] = replication ? "database" : NULL;
358359
i++;
360+
/*
361+
* For replication connections, disable the server-side walsender timeout.
362+
* Liveness detection is handled by TCP keepalives (keepalives_idle /
363+
* keepalives_interval / keepalives_count above) on both sides, and by
364+
* spock.apply_idle_timeout on the subscriber side as a safety net for a
365+
* hung-but-connected walsender. Leaving wal_sender_timeout active would
366+
* cause spurious disconnects whenever the subscriber is legitimately busy
367+
* applying a large transaction and cannot send 'r' feedback in time.
368+
*/
369+
if (replication)
370+
{
371+
keys[i] = "options";
372+
vals[i] = "-c wal_sender_timeout=0";
373+
i++;
374+
}
359375
keys[i] = NULL;
360376
vals[i] = NULL;
361377

@@ -1186,6 +1202,22 @@ _PG_init(void)
11861202
NULL,
11871203
NULL);
11881204

1205+
DefineCustomIntVariable("spock.apply_idle_timeout",
1206+
"Maximum idle time in seconds before apply worker reconnects",
1207+
"Safety net for detecting a hung walsender that keeps the "
1208+
"TCP connection alive but stops sending data. The timer "
1209+
"resets on any received message. Set to 0 to disable and "
1210+
"rely solely on TCP keepalive for liveness detection.",
1211+
&spock_apply_idle_timeout,
1212+
300,
1213+
0,
1214+
INT_MAX,
1215+
PGC_SIGHUP,
1216+
GUC_UNIT_S,
1217+
NULL,
1218+
NULL,
1219+
NULL);
1220+
11891221
if (IsBinaryUpgrade)
11901222
return;
11911223

src/spock_apply.c

Lines changed: 18 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,6 @@ static bool should_log_exception(bool failed);
229229
static ApplyReplayEntry * apply_replay_entry_create(int r, char *buf);
230230
static void apply_replay_entry_free(ApplyReplayEntry * entry);
231231
static void apply_replay_queue_reset(void);
232-
static void maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send,
233-
TimestampTz *last_receive_timestamp);
234232
static void append_feedback_position(XLogRecPtr recvpos);
235233
static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos,
236234
XLogRecPtr *flushpos, XLogRecPtr *max_recvpos);
@@ -2838,25 +2836,24 @@ apply_work(PGconn *streamConn)
28382836
}
28392837

28402838
/*
2841-
* The walsender is supposed to ping us for a status update every
2842-
* wal_sender_timeout / 2 milliseconds. If we don't get those, we
2843-
* assume that we have lost the connection.
2844-
*
2845-
* Note: keepalive configuration is supposed to cover this but is
2846-
* apparently unreliable.
2839+
* Connection liveness is handled by TCP keepalive (primary)
2840+
* and PQstatus == CONNECTION_BAD (above). The idle timeout
2841+
* below is a safety net for the case where the walsender
2842+
* process is alive but hung -- TCP probes succeed because the
2843+
* kernel ACKs them, but no data is being sent.
28472844
*/
2848-
if (rc & WL_TIMEOUT)
2845+
if (rc & WL_TIMEOUT && spock_apply_idle_timeout > 0)
28492846
{
28502847
TimestampTz timeout;
28512848

28522849
timeout = TimestampTzPlusMilliseconds(last_receive_timestamp,
2853-
(wal_sender_timeout * 3) / 2);
2850+
(long) spock_apply_idle_timeout * 1000L);
28542851
if (GetCurrentTimestamp() > timeout)
28552852
{
28562853
MySpockWorker->worker_status = SPOCK_WORKER_STATUS_STOPPED;
2857-
elog(ERROR, "SPOCK %s: terminating apply due to missing "
2858-
"walsender ping",
2859-
MySubscription->name);
2854+
elog(ERROR, "SPOCK %s: no data received for %d seconds, "
2855+
"reconnecting (spock.apply_idle_timeout)",
2856+
MySubscription->name, spock_apply_idle_timeout);
28602857
}
28612858
}
28622859

@@ -2879,8 +2876,6 @@ apply_work(PGconn *streamConn)
28792876
/* We are not in replay mode so receive from the stream */
28802877
r = PQgetCopyData(applyconn, &buf, 1);
28812878

2882-
last_receive_timestamp = GetCurrentTimestamp();
2883-
28842879
/* Check for errors */
28852880
if (r == -1)
28862881
{
@@ -2912,6 +2907,14 @@ apply_work(PGconn *streamConn)
29122907
break;
29132908
}
29142909

2910+
/*
2911+
* We received actual data. Update the idle-timeout clock
2912+
* only here, after confirming r > 0, so that a WL_TIMEOUT
2913+
* spin with no incoming data does not silently reset the
2914+
* timer and mask a hung walsender.
2915+
*/
2916+
last_receive_timestamp = GetCurrentTimestamp();
2917+
29152918
/*
29162919
* We have a valid message, create an apply queue entry
29172920
* but don't add it to the queue yet.
@@ -2947,16 +2950,6 @@ apply_work(PGconn *streamConn)
29472950
end_lsn = pq_getmsgint64(msg);
29482951
pq_getmsgint64(msg); /* sendTime */
29492952

2950-
/*
2951-
* Call maybe_send_feedback before last_received is
2952-
* updated. This ordering guarantees that feedback LSN
2953-
* never advertises a position beyond what has actually
2954-
* been received and processed. Prevents skipping over
2955-
* unapplied changes due to premature flush LSN.
2956-
*/
2957-
maybe_send_feedback(applyconn, last_received,
2958-
&last_receive_timestamp);
2959-
29602953
if (last_received < start_lsn)
29612954
last_received = start_lsn;
29622955

@@ -3924,39 +3917,6 @@ apply_replay_queue_reset(void)
39243917
MemoryContextReset(ApplyReplayContext);
39253918
}
39263919

3927-
/*
3928-
* Check if we should send feedback based on message count or timeout.
3929-
* Resets internal state if feedback is sent.
3930-
*/
3931-
static void
3932-
maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send,
3933-
TimestampTz *last_receive_timestamp)
3934-
{
3935-
static int w_message_count = 0;
3936-
TimestampTz now = GetCurrentTimestamp();
3937-
3938-
w_message_count++;
3939-
3940-
/*
3941-
* Send feedback if wal_sender_timeout/2 has passed or after 10 'w'
3942-
* messages.
3943-
*/
3944-
if (TimestampDifferenceExceeds(*last_receive_timestamp, now, wal_sender_timeout / 2) ||
3945-
w_message_count >= 10)
3946-
{
3947-
elog(DEBUG2, "SPOCK %s: force sending feedback after %d 'w' messages or timeout",
3948-
MySubscription->name, w_message_count);
3949-
3950-
/*
3951-
* We need to send feedback to the walsender process to avoid remote
3952-
* wal_sender_timeout.
3953-
*/
3954-
send_feedback(applyconn, lsn_to_send, now, true);
3955-
*last_receive_timestamp = now;
3956-
w_message_count = 0;
3957-
}
3958-
}
3959-
39603920
/*
39613921
* Advance the replication origin for forwarded transactions.
39623922
*

0 commit comments

Comments
 (0)