From b33dd2989961b89319d92c43ad6b4ec4a94f209c Mon Sep 17 00:00:00 2001 From: fox0430 Date: Mon, 27 Apr 2026 17:07:48 +0900 Subject: [PATCH] Add configs for listen reconnect --- async_postgres/pg_connection.nim | 41 ++++++++++++++-- tests/test_e2e.nim | 82 ++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 4 deletions(-) diff --git a/async_postgres/pg_connection.nim b/async_postgres/pg_connection.nim index 9d357ca..63432f7 100644 --- a/async_postgres/pg_connection.nim +++ b/async_postgres/pg_connection.nim @@ -177,6 +177,11 @@ type sendBuf: seq[byte] ## Reusable send buffer for COPY IN batching notifyDropped: int ## Count of notifications dropped due to queue overflow listenErrorMsg: string ## Set when listen pump fails permanently + listenReconnectMaxAttempts: int + ## Max reconnect attempts on listen pump failure. Default 10. + ## 0 or negative = unlimited retries (retry until close()). + listenReconnectMaxBackoff: int + ## Max seconds between reconnect attempts (backoff cap). Default 30. reconnectCallback: proc() {.gcsafe, raises: [].} notifyOverflowCallback: proc(dropped: int) {.gcsafe, raises: [].} stmtCache: Table[string, CachedStmt] @@ -417,6 +422,14 @@ func notifyMaxQueue*(conn: PgConnection): int {.inline.} = ## The maximum notification queue size (0 = unlimited). conn.notifyMaxQueue +func listenReconnectMaxAttempts*(conn: PgConnection): int {.inline.} = + ## Max reconnect attempts on listen pump failure (0 or negative = unlimited). + conn.listenReconnectMaxAttempts + +func listenReconnectMaxBackoff*(conn: PgConnection): int {.inline.} = + ## Maximum seconds between reconnect attempts (backoff cap). + conn.listenReconnectMaxBackoff + # Public API: read-write accessors func state*(conn: PgConnection): var PgConnState {.inline.} = @@ -475,6 +488,15 @@ proc `notifyMaxQueue=`*(conn: PgConnection, val: int) {.inline.} = ## Set the maximum notification queue size (0 = unlimited). conn.notifyMaxQueue = val +proc `listenReconnectMaxAttempts=`*(conn: PgConnection, val: int) {.inline.} = + ## Set the maximum reconnect attempts for the listen pump. + ## 0 or negative = unlimited retries (retry until close()). + conn.listenReconnectMaxAttempts = val + +proc `listenReconnectMaxBackoff=`*(conn: PgConnection, val: int) {.inline.} = + ## Set the maximum seconds between reconnect attempts (backoff cap). + conn.listenReconnectMaxBackoff = val + proc `notifyOverflowCallback=`*( conn: PgConnection, cb: proc(dropped: int) {.gcsafe, raises: [].} ) {.inline.} = @@ -1322,6 +1344,8 @@ proc connectToHost( config: config, notifyMaxQueue: 1024, stmtCacheCapacity: 256, + listenReconnectMaxAttempts: 10, + listenReconnectMaxBackoff: 30, ) elif hasAsyncDispatch: let sock = @@ -1368,6 +1392,8 @@ proc connectToHost( config: config, notifyMaxQueue: 1024, stmtCacheCapacity: 256, + listenReconnectMaxAttempts: 10, + listenReconnectMaxBackoff: 30, ) try: @@ -1959,7 +1985,8 @@ proc listenPump(conn: PgConnection) {.async.} = ## Background loop: repeatedly receives messages, dispatching notifications. ## Non-notification messages are discarded (recvMessage handles dispatch). ## On connection failure, attempts automatic reconnection with exponential - ## backoff (up to 10 attempts) and re-subscribes to all channels. + ## backoff (up to `listenReconnectMaxAttempts` attempts; 0 or negative = + ## unlimited) and re-subscribes to all channels. ## Exits cleanly when state changes from csListening (via stopListening ## sending an empty query), then drains until ReadyForQuery. while true: @@ -1985,9 +2012,13 @@ proc listenPump(conn: PgConnection) {.async.} = conn.notifyWaiter.fail(newException(PgError, "Connection closed")) return # Auto-reconnect with exponential backoff + let maxAttempts = conn.listenReconnectMaxAttempts + let maxBackoff = max(1, conn.listenReconnectMaxBackoff) + let unlimited = maxAttempts <= 0 var reconnected = false var backoff = 1 - for attempt in 0 ..< 10: + var attempt = 0 + while unlimited or attempt < maxAttempts: try: await sleepAsync(seconds(backoff)) await conn.reconnectInPlace() @@ -1999,10 +2030,12 @@ proc listenPump(conn: PgConnection) {.async.} = except CancelledError: return except CatchableError: - backoff = min(backoff * 2, 30) + backoff = min(backoff * 2, maxBackoff) + inc attempt if not reconnected: conn.listenErrorMsg = - "Listen connection lost: reconnection failed after 10 attempts" + "Listen connection lost: reconnection failed after " & $maxAttempts & + " attempts" conn.state = csClosed if conn.notifyWaiter != nil and not conn.notifyWaiter.finished: conn.notifyWaiter.fail(newException(PgError, conn.listenErrorMsg)) diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index 8ee5c70..0739629 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -4042,6 +4042,88 @@ when hasChronos: waitFor t() + test "listenReconnect config defaults": + proc t() {.async.} = + let conn = await connect(plainConfig()) + doAssert conn.listenReconnectMaxAttempts == 10 + doAssert conn.listenReconnectMaxBackoff == 30 + await conn.close() + + waitFor t() + + test "listenReconnect config setters": + proc t() {.async.} = + let conn = await connect(plainConfig()) + conn.listenReconnectMaxAttempts = 3 + conn.listenReconnectMaxBackoff = 5 + doAssert conn.listenReconnectMaxAttempts == 3 + doAssert conn.listenReconnectMaxBackoff == 5 + # 0 = unlimited (sentinel) + conn.listenReconnectMaxAttempts = 0 + doAssert conn.listenReconnectMaxAttempts == 0 + await conn.close() + + waitFor t() + + test "auto-reconnect honors custom maxAttempts setting": + proc t() {.async.} = + let listener = await connect(plainConfig()) + listener.listenReconnectMaxAttempts = 2 + listener.listenReconnectMaxBackoff = 1 + + var reconnected = false + listener.reconnectCallback = proc() {.gcsafe, raises: [].} = + reconnected = true + + await listener.listen("reconn_custom") + + let killer = await connect(plainConfig()) + try: + discard await killer.exec( + "SELECT pg_terminate_backend($1)", @[toPgParam(listener.pid)] + ) + except PgError: + discard + await killer.close() + + # First retry runs after backoff=1s; reconnect should succeed. + await sleepAsync(milliseconds(3000)) + doAssert reconnected + doAssert listener.state == csListening + + await listener.close() + + waitFor t() + + test "auto-reconnect with unlimited attempts (maxAttempts=0)": + proc t() {.async.} = + let listener = await connect(plainConfig()) + listener.listenReconnectMaxAttempts = 0 # unlimited + listener.listenReconnectMaxBackoff = 1 + + var reconnected = false + listener.reconnectCallback = proc() {.gcsafe, raises: [].} = + reconnected = true + + await listener.listen("reconn_unlimited") + + let killer = await connect(plainConfig()) + try: + discard await killer.exec( + "SELECT pg_terminate_backend($1)", @[toPgParam(listener.pid)] + ) + except PgError: + discard + await killer.close() + + await sleepAsync(milliseconds(3000)) + doAssert reconnected + doAssert listener.state == csListening + + await listener.close() + + waitFor t() + test "waitNotification fails on close": proc t() {.async.} = let listener = await connect(plainConfig())