Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Async PostgreSQL client in Nim.
- PostgreSQL wire protocol v3
- Simple Query and Extended Query Protocol
- Pipeline mode — batch multiple operations in a single network round trip
- Connection pooling with health checks and maintenance
- Connection pooling with health checks and maintenance (broken connections discarded on acquire/release)
- Pool cluster with read replica routing
- SSL/TLS support (disable, allow, prefer, require, verify-ca, verify-full)
- MD5, SCRAM-SHA-256 and SCRAM-SHA-256-PLUS authentication
Expand Down Expand Up @@ -98,6 +98,12 @@ proc main() {.async.} =
waitFor main()
```

## Reconnection Policy

- **Connection pool (`PgPool` / `PgPoolCluster`)** — broken connections are detected and discarded automatically. On `acquire`, entries whose state is not `csReady` (or that fail the optional `ping` health check) are retired and replaced. On `release`, connections left in a non-ready or in-transaction state are also closed rather than returned to the idle queue. Configure `healthCheckTimeout` / `pingTimeout` to tune idle-connection probing.
- **Direct `PgConnection`** — no automatic reconnection for regular queries. Per-query retry would be unsafe for non-idempotent statements and in-flight transactions, so a closed connection must be replaced by calling `connect(...)` again (or by using the pool). Inspect `conn.isConnected()` or `conn.state` to decide whether a handle is still usable.
- **LISTEN/NOTIFY** — this is the one exception. The listen pump reconnects with exponential backoff (up to 10 attempts, 30 s cap) and re-subscribes to all channels. Register a `reconnectCallback` if you need to resynchronise application state after a reconnect.

## Async Backend

By default, asyncdispatch is used. To use chronos:
Expand Down
14 changes: 13 additions & 1 deletion async_postgres/pg_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,9 @@ proc closeTransport(conn: PgConnection) {.async.} =
except CatchableError:
discard
conn.transport = nil
# Drop the cached reader/writer aliases so isConnected() reports false.
conn.reader = nil
conn.writer = nil
elif hasAsyncDispatch:
if not conn.socket.isNil:
conn.socket.close()
Expand Down Expand Up @@ -1726,7 +1729,16 @@ proc simpleExec*(
tag = await simpleExecImpl(conn, sql)
return initCommandResult(tag)

proc isConnected(conn: PgConnection): bool =
proc isConnected*(conn: PgConnection): bool =
## Whether the underlying transport is present.
##
## This is a cheap, non-blocking check (no round trip): it only reports
## whether the connection object currently holds a transport handle. It
## does **not** detect server-side closes that have not yet been observed
## by a read — use `ping` for that.
##
## Pair with `state == csReady` to decide whether a connection is usable
## before issuing a query.
when hasChronos:
not conn.writer.isNil
elif hasAsyncDispatch:
Expand Down
125 changes: 125 additions & 0 deletions tests/test_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import ../async_postgres/pg_protocol
import ../async_postgres/pg_connection
import ../async_postgres/pg_pool {.all.}

import ./mock_pg_server

privateAccess(PgPool)
privateAccess(PgConnection)
privateAccess(PooledConn)
Expand Down Expand Up @@ -1504,3 +1506,126 @@ suite "Pool metrics":
doAssert pool.metrics.timeoutCount == 1

waitFor t()

suite "isConnected":
test "returns false for mock connection without transport":
let conn = mockConn()
check not conn.isConnected()

test "returns false for csClosed mock":
let conn = mockConn(csClosed)
check not conn.isConnected()

when hasChronos:
test "returns true while transport is live, false after close":
proc t() {.async.} =
let (conn, server, serverTransport) = await makeHangingConn()
doAssert conn.isConnected()
await conn.close()
doAssert not conn.isConnected()
await cleanupHanging(server, serverTransport)

waitFor t()

proc mockConfig(port: int): ConnConfig =
ConnConfig(
host: "127.0.0.1", port: port, user: "test", database: "test", sslMode: sslDisable
)

suite "Pool broken connection handling (integration)":
test "query failure from server close transitions conn to csClosed and release discards it":
# End-to-end: a live pool connection that dies mid-query should surface as
# csClosed, so release() retires it instead of returning it to idle.
var finalState: PgConnState
var idleAfter = -1
var closeCountDelta: int64 = -1

proc testBody() {.async.} =
let ms = startMockServer()
proc serverHandler() {.async.} =
let st = await acceptAndReady(ms)
try:
discard await drainFrontendMessage(st) # client query
# Server disappears mid-query without sending a response
except CatchableError:
discard
await closeClient(st)

let serverFut = serverHandler()

let cfg = initPoolConfig(mockConfig(ms.port), minSize = 0, maxSize = 2)
let pool = await newPool(cfg)

let conn = await pool.acquire()
try:
discard await conn.simpleQuery("SELECT 1")
except CatchableError:
discard

finalState = conn.state
let before = pool.metrics.closeCount
pool.release(conn)
closeCountDelta = pool.metrics.closeCount - before
idleAfter = pool.idle.len

await pool.close()
await serverFut
await closeServer(ms)

waitFor testBody()
check finalState == csClosed
check closeCountDelta == 1
check idleAfter == 0

test "acquire skips an idle conn whose transport was torn down":
# An idle pool entry whose backend vanished (state surfaced as csClosed
# via a prior read, or an out-of-band close) must be retired on the next
# acquire attempt. We force-close from the client side to flip state,
# then check acquire() drops the entry and returns an alternative.
var idleAfter = -1
var closeCountDelta: int64 = -1

proc testBody() {.async.} =
let ms = startMockServer()
proc serverHandler() {.async.} =
try:
let st = await acceptAndReady(ms)
# Stay up long enough for the client to finish; close at the end.
await sleepAsync(milliseconds(200))
await closeClient(st)
except CatchableError:
discard

let serverFut = serverHandler()

let cfg = initPoolConfig(mockConfig(ms.port), minSize = 0, maxSize = 2)
let pool = await newPool(cfg)

let broken = await pool.acquire()
pool.release(broken)
doAssert pool.idle.len == 1

# Simulate the backend vanishing: csClosed on the idle entry.
await broken.close()
doAssert broken.state == csClosed

# Inject a healthy mock alongside the broken one so acquire has a
# non-real candidate to hand back without re-entering connect().
let good = mockConn()
pool.idle.addLast(toPooled(good))

let before = pool.metrics.closeCount
let acquired = await pool.acquire()
closeCountDelta = pool.metrics.closeCount - before
idleAfter = pool.idle.len

doAssert acquired == good

pool.release(acquired)
await pool.close()
await serverFut
await closeServer(ms)

waitFor testBody()
check closeCountDelta == 1
check idleAfter == 0