Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions async_postgres/pg_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ type
sendBuf: seq[byte] ## Reusable send buffer for COPY IN batching
notifyDropped: int ## Count of notifications dropped due to queue overflow
listenErrorMsg: string ## Set when listen pump fails permanently
listenReconnectMaxAttempts: int
## Max reconnect attempts on listen pump failure. Default 10.
## 0 or negative = unlimited retries (retry until close()).
listenReconnectMaxBackoff: int
## Max seconds between reconnect attempts (backoff cap). Default 30.
reconnectCallback: proc() {.gcsafe, raises: [].}
notifyOverflowCallback: proc(dropped: int) {.gcsafe, raises: [].}
stmtCache: Table[string, CachedStmt]
Expand Down Expand Up @@ -417,6 +422,14 @@ func notifyMaxQueue*(conn: PgConnection): int {.inline.} =
## The maximum notification queue size (0 = unlimited).
conn.notifyMaxQueue

func listenReconnectMaxAttempts*(conn: PgConnection): int {.inline.} =
## Max reconnect attempts on listen pump failure (0 or negative = unlimited).
conn.listenReconnectMaxAttempts

func listenReconnectMaxBackoff*(conn: PgConnection): int {.inline.} =
## Maximum seconds between reconnect attempts (backoff cap).
conn.listenReconnectMaxBackoff

# Public API: read-write accessors

func state*(conn: PgConnection): var PgConnState {.inline.} =
Expand Down Expand Up @@ -475,6 +488,15 @@ proc `notifyMaxQueue=`*(conn: PgConnection, val: int) {.inline.} =
## Set the maximum notification queue size (0 = unlimited).
conn.notifyMaxQueue = val

proc `listenReconnectMaxAttempts=`*(conn: PgConnection, val: int) {.inline.} =
## Set the maximum reconnect attempts for the listen pump.
## 0 or negative = unlimited retries (retry until close()).
conn.listenReconnectMaxAttempts = val

proc `listenReconnectMaxBackoff=`*(conn: PgConnection, val: int) {.inline.} =
## Set the maximum seconds between reconnect attempts (backoff cap).
conn.listenReconnectMaxBackoff = val

proc `notifyOverflowCallback=`*(
conn: PgConnection, cb: proc(dropped: int) {.gcsafe, raises: [].}
) {.inline.} =
Expand Down Expand Up @@ -1322,6 +1344,8 @@ proc connectToHost(
config: config,
notifyMaxQueue: 1024,
stmtCacheCapacity: 256,
listenReconnectMaxAttempts: 10,
listenReconnectMaxBackoff: 30,
)
elif hasAsyncDispatch:
let sock =
Expand Down Expand Up @@ -1368,6 +1392,8 @@ proc connectToHost(
config: config,
notifyMaxQueue: 1024,
stmtCacheCapacity: 256,
listenReconnectMaxAttempts: 10,
listenReconnectMaxBackoff: 30,
)

try:
Expand Down Expand Up @@ -1959,7 +1985,8 @@ proc listenPump(conn: PgConnection) {.async.} =
## Background loop: repeatedly receives messages, dispatching notifications.
## Non-notification messages are discarded (recvMessage handles dispatch).
## On connection failure, attempts automatic reconnection with exponential
## backoff (up to 10 attempts) and re-subscribes to all channels.
## backoff (up to `listenReconnectMaxAttempts` attempts; 0 or negative =
## unlimited) and re-subscribes to all channels.
## Exits cleanly when state changes from csListening (via stopListening
## sending an empty query), then drains until ReadyForQuery.
while true:
Expand All @@ -1985,9 +2012,13 @@ proc listenPump(conn: PgConnection) {.async.} =
conn.notifyWaiter.fail(newException(PgError, "Connection closed"))
return
# Auto-reconnect with exponential backoff
let maxAttempts = conn.listenReconnectMaxAttempts
let maxBackoff = max(1, conn.listenReconnectMaxBackoff)
let unlimited = maxAttempts <= 0
var reconnected = false
var backoff = 1
for attempt in 0 ..< 10:
var attempt = 0
while unlimited or attempt < maxAttempts:
try:
await sleepAsync(seconds(backoff))
await conn.reconnectInPlace()
Expand All @@ -1999,10 +2030,12 @@ proc listenPump(conn: PgConnection) {.async.} =
except CancelledError:
return
except CatchableError:
backoff = min(backoff * 2, 30)
backoff = min(backoff * 2, maxBackoff)
inc attempt
if not reconnected:
conn.listenErrorMsg =
"Listen connection lost: reconnection failed after 10 attempts"
"Listen connection lost: reconnection failed after " & $maxAttempts &
" attempts"
conn.state = csClosed
if conn.notifyWaiter != nil and not conn.notifyWaiter.finished:
conn.notifyWaiter.fail(newException(PgError, conn.listenErrorMsg))
Expand Down
82 changes: 82 additions & 0 deletions tests/test_e2e.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4042,6 +4042,88 @@ when hasChronos:

waitFor t()

test "listenReconnect config defaults":
proc t() {.async.} =
let conn = await connect(plainConfig())
doAssert conn.listenReconnectMaxAttempts == 10
doAssert conn.listenReconnectMaxBackoff == 30
await conn.close()

waitFor t()

test "listenReconnect config setters":
proc t() {.async.} =
let conn = await connect(plainConfig())
conn.listenReconnectMaxAttempts = 3
conn.listenReconnectMaxBackoff = 5
doAssert conn.listenReconnectMaxAttempts == 3
doAssert conn.listenReconnectMaxBackoff == 5
# 0 = unlimited (sentinel)
conn.listenReconnectMaxAttempts = 0
doAssert conn.listenReconnectMaxAttempts == 0
await conn.close()

waitFor t()

test "auto-reconnect honors custom maxAttempts setting":
proc t() {.async.} =
let listener = await connect(plainConfig())
listener.listenReconnectMaxAttempts = 2
listener.listenReconnectMaxBackoff = 1

var reconnected = false
listener.reconnectCallback = proc() {.gcsafe, raises: [].} =
reconnected = true

await listener.listen("reconn_custom")

let killer = await connect(plainConfig())
try:
discard await killer.exec(
"SELECT pg_terminate_backend($1)", @[toPgParam(listener.pid)]
)
except PgError:
discard
await killer.close()

# First retry runs after backoff=1s; reconnect should succeed.
await sleepAsync(milliseconds(3000))
doAssert reconnected
doAssert listener.state == csListening

await listener.close()

waitFor t()

test "auto-reconnect with unlimited attempts (maxAttempts=0)":
proc t() {.async.} =
let listener = await connect(plainConfig())
listener.listenReconnectMaxAttempts = 0 # unlimited
listener.listenReconnectMaxBackoff = 1

var reconnected = false
listener.reconnectCallback = proc() {.gcsafe, raises: [].} =
reconnected = true

await listener.listen("reconn_unlimited")

let killer = await connect(plainConfig())
try:
discard await killer.exec(
"SELECT pg_terminate_backend($1)", @[toPgParam(listener.pid)]
)
except PgError:
discard
await killer.close()

await sleepAsync(milliseconds(3000))
doAssert reconnected
doAssert listener.state == csListening

await listener.close()

waitFor t()

test "waitNotification fails on close":
proc t() {.async.} =
let listener = await connect(plainConfig())
Expand Down