From d2b17d68cd302a289243b802c63bec3785d9154c Mon Sep 17 00:00:00 2001 From: fox0430 Date: Wed, 22 Apr 2026 16:05:14 +0900 Subject: [PATCH] Add isConnected --- README.md | 8 +- async_postgres/pg_connection.nim | 14 +++- tests/test_pool.nim | 125 +++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 03fa648..1dc422e 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Async PostgreSQL client in Nim. - PostgreSQL wire protocol v3 - Simple Query and Extended Query Protocol - Pipeline mode — batch multiple operations in a single network round trip -- Connection pooling with health checks and maintenance +- Connection pooling with health checks and maintenance (broken connections discarded on acquire/release) - Pool cluster with read replica routing - SSL/TLS support (disable, allow, prefer, require, verify-ca, verify-full) - MD5, SCRAM-SHA-256 and SCRAM-SHA-256-PLUS authentication @@ -98,6 +98,12 @@ proc main() {.async.} = waitFor main() ``` +## Reconnection Policy + +- **Connection pool (`PgPool` / `PgPoolCluster`)** — broken connections are detected and discarded automatically. On `acquire`, entries whose state is not `csReady` (or that fail the optional `ping` health check) are retired and replaced. On `release`, connections left in a non-ready or in-transaction state are also closed rather than returned to the idle queue. Configure `healthCheckTimeout` / `pingTimeout` to tune idle-connection probing. +- **Direct `PgConnection`** — no automatic reconnection for regular queries. Per-query retry would be unsafe for non-idempotent statements and in-flight transactions, so a closed connection must be replaced by calling `connect(...)` again (or by using the pool). Inspect `conn.isConnected()` or `conn.state` to decide whether a handle is still usable. +- **LISTEN/NOTIFY** — this is the one exception. The listen pump reconnects with exponential backoff (up to 10 attempts, 30 s cap) and re-subscribes to all channels. Register a `reconnectCallback` if you need to resynchronise application state after a reconnect. + ## Async Backend By default, asyncdispatch is used. To use chronos: diff --git a/async_postgres/pg_connection.nim b/async_postgres/pg_connection.nim index 9e488b1..c18cf71 100644 --- a/async_postgres/pg_connection.nim +++ b/async_postgres/pg_connection.nim @@ -959,6 +959,9 @@ proc closeTransport(conn: PgConnection) {.async.} = except CatchableError: discard conn.transport = nil + # Drop the cached reader/writer aliases so isConnected() reports false. + conn.reader = nil + conn.writer = nil elif hasAsyncDispatch: if not conn.socket.isNil: conn.socket.close() @@ -1726,7 +1729,16 @@ proc simpleExec*( tag = await simpleExecImpl(conn, sql) return initCommandResult(tag) -proc isConnected(conn: PgConnection): bool = +proc isConnected*(conn: PgConnection): bool = + ## Whether the underlying transport is present. + ## + ## This is a cheap, non-blocking check (no round trip): it only reports + ## whether the connection object currently holds a transport handle. It + ## does **not** detect server-side closes that have not yet been observed + ## by a read — use `ping` for that. + ## + ## Pair with `state == csReady` to decide whether a connection is usable + ## before issuing a query. when hasChronos: not conn.writer.isNil elif hasAsyncDispatch: diff --git a/tests/test_pool.nim b/tests/test_pool.nim index fec0c50..3aa2c04 100644 --- a/tests/test_pool.nim +++ b/tests/test_pool.nim @@ -8,6 +8,8 @@ import ../async_postgres/pg_protocol import ../async_postgres/pg_connection import ../async_postgres/pg_pool {.all.} +import ./mock_pg_server + privateAccess(PgPool) privateAccess(PgConnection) privateAccess(PooledConn) @@ -1504,3 +1506,126 @@ suite "Pool metrics": doAssert pool.metrics.timeoutCount == 1 waitFor t() + +suite "isConnected": + test "returns false for mock connection without transport": + let conn = mockConn() + check not conn.isConnected() + + test "returns false for csClosed mock": + let conn = mockConn(csClosed) + check not conn.isConnected() + + when hasChronos: + test "returns true while transport is live, false after close": + proc t() {.async.} = + let (conn, server, serverTransport) = await makeHangingConn() + doAssert conn.isConnected() + await conn.close() + doAssert not conn.isConnected() + await cleanupHanging(server, serverTransport) + + waitFor t() + +proc mockConfig(port: int): ConnConfig = + ConnConfig( + host: "127.0.0.1", port: port, user: "test", database: "test", sslMode: sslDisable + ) + +suite "Pool broken connection handling (integration)": + test "query failure from server close transitions conn to csClosed and release discards it": + # End-to-end: a live pool connection that dies mid-query should surface as + # csClosed, so release() retires it instead of returning it to idle. + var finalState: PgConnState + var idleAfter = -1 + var closeCountDelta: int64 = -1 + + proc testBody() {.async.} = + let ms = startMockServer() + proc serverHandler() {.async.} = + let st = await acceptAndReady(ms) + try: + discard await drainFrontendMessage(st) # client query + # Server disappears mid-query without sending a response + except CatchableError: + discard + await closeClient(st) + + let serverFut = serverHandler() + + let cfg = initPoolConfig(mockConfig(ms.port), minSize = 0, maxSize = 2) + let pool = await newPool(cfg) + + let conn = await pool.acquire() + try: + discard await conn.simpleQuery("SELECT 1") + except CatchableError: + discard + + finalState = conn.state + let before = pool.metrics.closeCount + pool.release(conn) + closeCountDelta = pool.metrics.closeCount - before + idleAfter = pool.idle.len + + await pool.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check finalState == csClosed + check closeCountDelta == 1 + check idleAfter == 0 + + test "acquire skips an idle conn whose transport was torn down": + # An idle pool entry whose backend vanished (state surfaced as csClosed + # via a prior read, or an out-of-band close) must be retired on the next + # acquire attempt. We force-close from the client side to flip state, + # then check acquire() drops the entry and returns an alternative. + var idleAfter = -1 + var closeCountDelta: int64 = -1 + + proc testBody() {.async.} = + let ms = startMockServer() + proc serverHandler() {.async.} = + try: + let st = await acceptAndReady(ms) + # Stay up long enough for the client to finish; close at the end. + await sleepAsync(milliseconds(200)) + await closeClient(st) + except CatchableError: + discard + + let serverFut = serverHandler() + + let cfg = initPoolConfig(mockConfig(ms.port), minSize = 0, maxSize = 2) + let pool = await newPool(cfg) + + let broken = await pool.acquire() + pool.release(broken) + doAssert pool.idle.len == 1 + + # Simulate the backend vanishing: csClosed on the idle entry. + await broken.close() + doAssert broken.state == csClosed + + # Inject a healthy mock alongside the broken one so acquire has a + # non-real candidate to hand back without re-entering connect(). + let good = mockConn() + pool.idle.addLast(toPooled(good)) + + let before = pool.metrics.closeCount + let acquired = await pool.acquire() + closeCountDelta = pool.metrics.closeCount - before + idleAfter = pool.idle.len + + doAssert acquired == good + + pool.release(acquired) + await pool.close() + await serverFut + await closeServer(ms) + + waitFor testBody() + check closeCountDelta == 1 + check idleAfter == 0