From ea8e00bb9f0bfd62103b868cea5f26c081a58656 Mon Sep 17 00:00:00 2001 From: fox0430 Date: Fri, 17 Apr 2026 20:01:27 +0900 Subject: [PATCH] refactor: Extract invalidateOnTimeout helper for timeout recovery --- async_postgres/async_backend.nim | 29 ++++++++-- async_postgres/pg_client.nim | 92 ++++++++------------------------ async_postgres/pg_connection.nim | 26 ++++++--- async_postgres/pg_pool.nim | 23 ++++++-- 4 files changed, 88 insertions(+), 82 deletions(-) diff --git a/async_postgres/async_backend.nim b/async_postgres/async_backend.nim index 3d48376..135c8ba 100644 --- a/async_postgres/async_backend.nim +++ b/async_postgres/async_backend.nim @@ -153,8 +153,19 @@ elif hasAsyncDispatch: proc wait*[T](fut: Future[T], timeout: Duration): Future[T] {.async.} = ## Wait for a future with a timeout. Raises AsyncTimeoutError on timeout. ## API-compatible with chronos Future.wait(). - ## Note: asyncdispatch has no cancellation, so the inner future keeps running - ## after timeout. We add a callback to suppress unhandled exception warnings. + ## + ## .. warning:: + ## asyncdispatch has no cancellation. When a timeout fires, the inner + ## future keeps running in the background until its I/O completes. For + ## this reason **the underlying connection MUST NOT be reused after an + ## AsyncTimeoutError** — a late write from the suspended future would + ## corrupt the protocol stream of whoever reuses it next. The pg_client + ## layer forces `csClosed` on timeout under asyncdispatch; see + ## `invalidateOnTimeout` in pg_client.nim. chronos is not affected because + ## its futures are actually cancelled. + ## + ## Note: we add a no-op callback to suppress unhandled exception warnings + ## when the suspended inner future eventually fails. let ms = toMilliseconds(timeout) let completed = await withTimeout(fut, ms) if not completed: @@ -173,7 +184,15 @@ elif hasAsyncDispatch: proc cancelAndWait*(fut: Future[void]): Future[void] {.async.} = ## Cancel a future and wait for completion. - ## asyncdispatch has no real cancellation; this is a best-effort no-op. + ## + ## .. warning:: + ## On asyncdispatch this is a **no-op** — the future is neither cancelled + ## nor awaited. asyncdispatch has no cancellation primitive. Callers must + ## not assume the future has stopped: any buffer it holds via + ## `unsafeAddr` remains live, and any socket write it scheduled will + ## still complete. Do not reuse the affected resource (socket, buffer) + ## after calling this under asyncdispatch. chronos cancels the future + ## properly. discard proc asyncSpawn*(fut: Future[void]) = @@ -217,7 +236,9 @@ elif hasAsyncDispatch: asyncdispatch.sleepAsync(ms) proc cancelTimer*(fut: Future[void]) = - ## No-op: asyncdispatch timers complete harmlessly and are GC'd. + ## No-op under asyncdispatch: timers cannot be cancelled, but they complete + ## harmlessly and are garbage-collected. Provided for API parity with + ## chronos, which does cancel the timer via `cancelSoon`. discard proc registerFdReader*(fd: cint, cb: proc() {.gcsafe, raises: [].}) = diff --git a/async_postgres/pg_client.nim b/async_postgres/pg_client.nim index c06b960..555f11f 100644 --- a/async_postgres/pg_client.nim +++ b/async_postgres/pg_client.nim @@ -353,9 +353,7 @@ proc exec( timeout ) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Exec timed out") + conn.invalidateOnTimeout("Exec timed out") else: return await execImpl(conn, sql, params, paramOids, paramFormats) @@ -381,9 +379,7 @@ proc exec*( try: tag = await execImpl(conn, sql, params, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Exec timed out") + conn.invalidateOnTimeout("Exec timed out") else: tag = await execImpl(conn, sql, params) return initCommandResult(tag) @@ -547,9 +543,7 @@ proc exec*( timeout ) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Exec timed out") + conn.invalidateOnTimeout("Exec timed out") else: tag = await execInlineImpl(conn, sql, data, ranges, oids, formats) return initCommandResult(tag) @@ -962,9 +956,7 @@ proc queryEach*( count = await queryEachImpl(conn, sql, params, callback, resultFormats, timeout) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "queryEach timed out") + conn.invalidateOnTimeout("queryEach timed out") else: count = await queryEachImpl(conn, sql, params, callback, resultFormats) return count @@ -988,9 +980,7 @@ proc query( ) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Query timed out") + conn.invalidateOnTimeout("Query timed out") else: return await queryImpl(conn, sql, params, paramOids, paramFormats, resultFormats) @@ -1018,9 +1008,7 @@ proc query*( try: qr = await queryImpl(conn, sql, params, resultFormats, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Query timed out") + conn.invalidateOnTimeout("Query timed out") else: qr = await queryImpl(conn, sql, params, resultFormats) return qr @@ -1120,9 +1108,7 @@ proc query*( ) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Query timed out") + conn.invalidateOnTimeout("Query timed out") else: qr = await queryInlineImpl(conn, sql, data, ranges, oids, formats, resultFormats) return qr @@ -1347,9 +1333,7 @@ proc prepare*( try: stmt = await prepareImpl(conn, name, sql, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Prepare timed out") + conn.invalidateOnTimeout("Prepare timed out") else: stmt = await prepareImpl(conn, name, sql) return stmt @@ -1448,9 +1432,7 @@ proc execute*( try: qr = await executeImpl(stmt, params, resultFormats, timeout).wait(timeout) except AsyncTimeoutError: - stmt.conn.cancelNoWait() - stmt.conn.state = csClosed - raise newException(PgTimeoutError, "Execute timed out") + stmt.conn.invalidateOnTimeout("Execute timed out") else: qr = await executeImpl(stmt, params, resultFormats) return qr @@ -1498,9 +1480,7 @@ proc close*( try: await closeImpl(stmt, timeout).wait(timeout) except AsyncTimeoutError: - stmt.conn.cancelNoWait() - stmt.conn.state = csClosed - raise newException(PgTimeoutError, "Statement close timed out") + stmt.conn.invalidateOnTimeout("Statement close timed out") else: await closeImpl(stmt) @@ -1608,9 +1588,7 @@ proc copyIn*( try: tag = await copyInRawImpl(conn, sql, data, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "COPY IN timed out") + conn.invalidateOnTimeout("COPY IN timed out") else: tag = await copyInRawImpl(conn, sql, data) return initCommandResult(tag) @@ -1777,9 +1755,7 @@ proc copyInStream*( try: info = await copyInStreamImpl(conn, sql, callback, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "COPY IN stream timed out") + conn.invalidateOnTimeout("COPY IN stream timed out") else: info = await copyInStreamImpl(conn, sql, callback) return info @@ -1841,9 +1817,7 @@ proc copyOut*( try: cr = await copyOutImpl(conn, sql, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "COPY OUT timed out") + conn.invalidateOnTimeout("COPY OUT timed out") else: cr = await copyOutImpl(conn, sql) return cr @@ -1928,9 +1902,7 @@ proc copyOutStream*( try: info = await copyOutStreamImpl(conn, sql, callback, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "COPY OUT stream timed out") + conn.invalidateOnTimeout("COPY OUT stream timed out") else: info = await copyOutStreamImpl(conn, sql, callback) return info @@ -2217,9 +2189,7 @@ proc execInTransaction( ) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "execInTransaction timed out") + conn.invalidateOnTimeout("execInTransaction timed out") else: tag = await execInTransactionImpl( conn, "BEGIN", sql, params, paramOids, paramFormats, timeout @@ -2263,9 +2233,7 @@ proc execInTransaction*( ) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "execInTransaction timed out") + conn.invalidateOnTimeout("execInTransaction timed out") else: tag = await execInTransactionImpl(conn, beginSql, sql, values, oids, formats, timeout) @@ -2376,9 +2344,7 @@ proc queryInTransaction( ) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "queryInTransaction timed out") + conn.invalidateOnTimeout("queryInTransaction timed out") else: qr = await queryInTransactionImpl( conn, "BEGIN", sql, params, paramOids, paramFormats, resultFormats, timeout @@ -2426,9 +2392,7 @@ proc queryInTransaction*( ) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "queryInTransaction timed out") + conn.invalidateOnTimeout("queryInTransaction timed out") else: qr = await queryInTransactionImpl( conn, beginSql, sql, values, oids, formats, resultFormats, timeout @@ -2770,9 +2734,7 @@ proc execute*( try: results = await executeImpl(p, timeout).wait(timeout) except AsyncTimeoutError: - p.conn.cancelNoWait() - p.conn.state = csClosed - raise newException(PgTimeoutError, "Pipeline execute timed out") + p.conn.invalidateOnTimeout("Pipeline execute timed out") else: results = await executeImpl(p, timeout) return results @@ -3022,9 +2984,7 @@ proc executeIsolated*( try: ir = await executeIsolatedImpl(p, timeout).wait(timeout) except AsyncTimeoutError: - p.conn.cancelNoWait() - p.conn.state = csClosed - raise newException(PgTimeoutError, "Pipeline executeIsolated timed out") + p.conn.invalidateOnTimeout("Pipeline executeIsolated timed out") else: ir = await executeIsolatedImpl(p, timeout) return ir @@ -3135,9 +3095,7 @@ proc openCursor( ) .wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Cursor open timed out") + conn.invalidateOnTimeout("Cursor open timed out") else: result = await openCursorImpl( conn, sql, params, paramOids, paramFormats, resultFormats, chunkSize @@ -3228,9 +3186,7 @@ proc fetchNext*(cursor: Cursor): Future[seq[Row]] {.async.} = try: return await fetchNextImpl(cursor, cursor.timeout).wait(cursor.timeout) except AsyncTimeoutError: - cursor.conn.cancelNoWait() - cursor.conn.state = csClosed - raise newException(PgTimeoutError, "Cursor fetch timed out") + cursor.conn.invalidateOnTimeout("Cursor fetch timed out") else: return await fetchNextImpl(cursor) @@ -3276,9 +3232,7 @@ proc close*(cursor: Cursor): Future[void] {.async.} = try: await closeCursorImpl(cursor, cursor.timeout).wait(cursor.timeout) except AsyncTimeoutError: - cursor.conn.cancelNoWait() - cursor.conn.state = csClosed - raise newException(PgTimeoutError, "Cursor close timed out") + cursor.conn.invalidateOnTimeout("Cursor close timed out") else: await closeCursorImpl(cursor) diff --git a/async_postgres/pg_connection.nim b/async_postgres/pg_connection.nim index 36c74b9..0d0b164 100644 --- a/async_postgres/pg_connection.nim +++ b/async_postgres/pg_connection.nim @@ -1508,6 +1508,24 @@ proc cancelNoWait*(conn: PgConnection) = asyncSpawn doCancel() +proc invalidateOnTimeout*(conn: PgConnection, reason: string) = + ## Timeout recovery for a connection whose last request may have left the + ## protocol out of sync. Schedules a best-effort CancelRequest via + ## `cancelNoWait`, marks the connection `csClosed` so it cannot be reused, + ## and raises `PgTimeoutError` with `reason`. + ## + ## Under asyncdispatch this is the **only** safe recovery path: the inner + ## future keeps running in the background after `wait()` fires, and may + ## still write to the socket. Reusing the connection would interleave its + ## stale write with a new request and corrupt the protocol stream. chronos + ## cancels the inner future properly, but we still invalidate unconditionally + ## — the server may have processed the request partially and the cached + ## session state (prepared statements, portals, transaction status) is no + ## longer reliable. + conn.cancelNoWait() + conn.state = csClosed + raise newException(PgTimeoutError, reason) + proc simpleExec*( conn: PgConnection, sql: string, timeout: Duration = ZeroDuration ): Future[CommandResult] {.async.} = @@ -1527,9 +1545,7 @@ proc simpleExec*( try: tag = await simpleExecImpl(conn, sql, timeout).wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "simpleExec timed out") + conn.invalidateOnTimeout("simpleExec timed out") else: tag = await simpleExecImpl(conn, sql) return initCommandResult(tag) @@ -1577,9 +1593,7 @@ proc ping*(conn: PgConnection, timeout = ZeroDuration): Future[void] = try: await perform().wait(timeout) except AsyncTimeoutError: - conn.cancelNoWait() - conn.state = csClosed - raise newException(PgTimeoutError, "Ping timed out") + conn.invalidateOnTimeout("Ping timed out") withTimeout() else: diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index 06ac529..a9613a4 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -163,9 +163,16 @@ proc metrics*(pool: PgPool): PoolMetrics = pool.metrics proc closeNoWait(pool: PgPool, conn: PgConnection) = - ## Schedule connection close without waiting. For use in non-async contexts. - ## The spawned task is tracked in `pool.pendingCloses` so `pool.close()` can - ## await its completion for graceful shutdown. + ## Schedule connection close without waiting. For use in non-async contexts + ## (e.g. `release()` is synchronous). The spawned task is tracked in + ## `pool.pendingCloses` so `pool.close()` can await its completion for + ## graceful shutdown. + ## + ## Note on asyncdispatch: a close scheduled here may race with an inflight + ## request future that the previous timeout could not cancel (see + ## `invalidateOnTimeout`). That future will observe a closed fd and fail + ## quietly — we rely on `asyncSpawn` swallowing the resulting CatchableError + ## so it never surfaces. The connection is not reused either way. pool.metrics.closeCount.inc proc doClose() {.async.} = try: @@ -311,6 +318,16 @@ proc release*(pool: PgPool, conn: PgConnection) = ## Return a connection to the pool. If the connection is broken or in a ## transaction, it is closed instead. If waiters are queued, the connection ## is handed directly to the next waiter. + ## + ## Discard criteria (`conn.state != csReady`): + ## - A timed-out request reaches us via `invalidateOnTimeout` with + ## `state = csClosed`. Under asyncdispatch this is load-bearing: the + ## inner future is still alive and may write to the socket, so the + ## connection MUST be retired from the pool. + ## - Any listening/replication/COPY state is also not reusable. + ## Transaction-in-progress (`txStatus != tsIdle`) is treated as failure + ## to reset the session, so the connection is closed rather than leaking + ## transaction state to the next borrower. var traceCtx: TraceContext if pool.config.tracer != nil and pool.config.tracer.onPoolReleaseStart != nil: traceCtx =