Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions async_postgres/async_backend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,19 @@ elif hasAsyncDispatch:
proc wait*[T](fut: Future[T], timeout: Duration): Future[T] {.async.} =
## Wait for a future with a timeout. Raises AsyncTimeoutError on timeout.
## API-compatible with chronos Future.wait().
## Note: asyncdispatch has no cancellation, so the inner future keeps running
## after timeout. We add a callback to suppress unhandled exception warnings.
##
## .. warning::
## asyncdispatch has no cancellation. When a timeout fires, the inner
## future keeps running in the background until its I/O completes. For
## this reason **the underlying connection MUST NOT be reused after an
## AsyncTimeoutError** — a late write from the suspended future would
## corrupt the protocol stream of whoever reuses it next. The pg_client
## layer forces `csClosed` on timeout under asyncdispatch; see
## `invalidateOnTimeout` in pg_client.nim. chronos is not affected because
## its futures are actually cancelled.
##
## Note: we add a no-op callback to suppress unhandled exception warnings
## when the suspended inner future eventually fails.
let ms = toMilliseconds(timeout)
let completed = await withTimeout(fut, ms)
if not completed:
Expand All @@ -173,7 +184,15 @@ elif hasAsyncDispatch:

proc cancelAndWait*(fut: Future[void]): Future[void] {.async.} =
## Cancel a future and wait for completion.
## asyncdispatch has no real cancellation; this is a best-effort no-op.
##
## .. warning::
## On asyncdispatch this is a **no-op** — the future is neither cancelled
## nor awaited. asyncdispatch has no cancellation primitive. Callers must
## not assume the future has stopped: any buffer it holds via
## `unsafeAddr` remains live, and any socket write it scheduled will
## still complete. Do not reuse the affected resource (socket, buffer)
## after calling this under asyncdispatch. chronos cancels the future
## properly.
discard

proc asyncSpawn*(fut: Future[void]) =
Expand Down Expand Up @@ -217,7 +236,9 @@ elif hasAsyncDispatch:
asyncdispatch.sleepAsync(ms)

proc cancelTimer*(fut: Future[void]) =
## No-op: asyncdispatch timers complete harmlessly and are GC'd.
## No-op under asyncdispatch: timers cannot be cancelled, but they complete
## harmlessly and are garbage-collected. Provided for API parity with
## chronos, which does cancel the timer via `cancelSoon`.
discard

proc registerFdReader*(fd: cint, cb: proc() {.gcsafe, raises: [].}) =
Expand Down
92 changes: 23 additions & 69 deletions async_postgres/pg_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,7 @@ proc exec(
timeout
)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Exec timed out")
conn.invalidateOnTimeout("Exec timed out")
else:
return await execImpl(conn, sql, params, paramOids, paramFormats)

Expand All @@ -381,9 +379,7 @@ proc exec*(
try:
tag = await execImpl(conn, sql, params, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Exec timed out")
conn.invalidateOnTimeout("Exec timed out")
else:
tag = await execImpl(conn, sql, params)
return initCommandResult(tag)
Expand Down Expand Up @@ -547,9 +543,7 @@ proc exec*(
timeout
)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Exec timed out")
conn.invalidateOnTimeout("Exec timed out")
else:
tag = await execInlineImpl(conn, sql, data, ranges, oids, formats)
return initCommandResult(tag)
Expand Down Expand Up @@ -962,9 +956,7 @@ proc queryEach*(
count = await queryEachImpl(conn, sql, params, callback, resultFormats, timeout)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "queryEach timed out")
conn.invalidateOnTimeout("queryEach timed out")
else:
count = await queryEachImpl(conn, sql, params, callback, resultFormats)
return count
Expand All @@ -988,9 +980,7 @@ proc query(
)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Query timed out")
conn.invalidateOnTimeout("Query timed out")
else:
return await queryImpl(conn, sql, params, paramOids, paramFormats, resultFormats)

Expand Down Expand Up @@ -1018,9 +1008,7 @@ proc query*(
try:
qr = await queryImpl(conn, sql, params, resultFormats, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Query timed out")
conn.invalidateOnTimeout("Query timed out")
else:
qr = await queryImpl(conn, sql, params, resultFormats)
return qr
Expand Down Expand Up @@ -1120,9 +1108,7 @@ proc query*(
)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Query timed out")
conn.invalidateOnTimeout("Query timed out")
else:
qr = await queryInlineImpl(conn, sql, data, ranges, oids, formats, resultFormats)
return qr
Expand Down Expand Up @@ -1347,9 +1333,7 @@ proc prepare*(
try:
stmt = await prepareImpl(conn, name, sql, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Prepare timed out")
conn.invalidateOnTimeout("Prepare timed out")
else:
stmt = await prepareImpl(conn, name, sql)
return stmt
Expand Down Expand Up @@ -1448,9 +1432,7 @@ proc execute*(
try:
qr = await executeImpl(stmt, params, resultFormats, timeout).wait(timeout)
except AsyncTimeoutError:
stmt.conn.cancelNoWait()
stmt.conn.state = csClosed
raise newException(PgTimeoutError, "Execute timed out")
stmt.conn.invalidateOnTimeout("Execute timed out")
else:
qr = await executeImpl(stmt, params, resultFormats)
return qr
Expand Down Expand Up @@ -1498,9 +1480,7 @@ proc close*(
try:
await closeImpl(stmt, timeout).wait(timeout)
except AsyncTimeoutError:
stmt.conn.cancelNoWait()
stmt.conn.state = csClosed
raise newException(PgTimeoutError, "Statement close timed out")
stmt.conn.invalidateOnTimeout("Statement close timed out")
else:
await closeImpl(stmt)

Expand Down Expand Up @@ -1608,9 +1588,7 @@ proc copyIn*(
try:
tag = await copyInRawImpl(conn, sql, data, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "COPY IN timed out")
conn.invalidateOnTimeout("COPY IN timed out")
else:
tag = await copyInRawImpl(conn, sql, data)
return initCommandResult(tag)
Expand Down Expand Up @@ -1777,9 +1755,7 @@ proc copyInStream*(
try:
info = await copyInStreamImpl(conn, sql, callback, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "COPY IN stream timed out")
conn.invalidateOnTimeout("COPY IN stream timed out")
else:
info = await copyInStreamImpl(conn, sql, callback)
return info
Expand Down Expand Up @@ -1841,9 +1817,7 @@ proc copyOut*(
try:
cr = await copyOutImpl(conn, sql, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "COPY OUT timed out")
conn.invalidateOnTimeout("COPY OUT timed out")
else:
cr = await copyOutImpl(conn, sql)
return cr
Expand Down Expand Up @@ -1928,9 +1902,7 @@ proc copyOutStream*(
try:
info = await copyOutStreamImpl(conn, sql, callback, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "COPY OUT stream timed out")
conn.invalidateOnTimeout("COPY OUT stream timed out")
else:
info = await copyOutStreamImpl(conn, sql, callback)
return info
Expand Down Expand Up @@ -2217,9 +2189,7 @@ proc execInTransaction(
)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "execInTransaction timed out")
conn.invalidateOnTimeout("execInTransaction timed out")
else:
tag = await execInTransactionImpl(
conn, "BEGIN", sql, params, paramOids, paramFormats, timeout
Expand Down Expand Up @@ -2263,9 +2233,7 @@ proc execInTransaction*(
)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "execInTransaction timed out")
conn.invalidateOnTimeout("execInTransaction timed out")
else:
tag =
await execInTransactionImpl(conn, beginSql, sql, values, oids, formats, timeout)
Expand Down Expand Up @@ -2376,9 +2344,7 @@ proc queryInTransaction(
)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "queryInTransaction timed out")
conn.invalidateOnTimeout("queryInTransaction timed out")
else:
qr = await queryInTransactionImpl(
conn, "BEGIN", sql, params, paramOids, paramFormats, resultFormats, timeout
Expand Down Expand Up @@ -2426,9 +2392,7 @@ proc queryInTransaction*(
)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "queryInTransaction timed out")
conn.invalidateOnTimeout("queryInTransaction timed out")
else:
qr = await queryInTransactionImpl(
conn, beginSql, sql, values, oids, formats, resultFormats, timeout
Expand Down Expand Up @@ -2770,9 +2734,7 @@ proc execute*(
try:
results = await executeImpl(p, timeout).wait(timeout)
except AsyncTimeoutError:
p.conn.cancelNoWait()
p.conn.state = csClosed
raise newException(PgTimeoutError, "Pipeline execute timed out")
p.conn.invalidateOnTimeout("Pipeline execute timed out")
else:
results = await executeImpl(p, timeout)
return results
Expand Down Expand Up @@ -3022,9 +2984,7 @@ proc executeIsolated*(
try:
ir = await executeIsolatedImpl(p, timeout).wait(timeout)
except AsyncTimeoutError:
p.conn.cancelNoWait()
p.conn.state = csClosed
raise newException(PgTimeoutError, "Pipeline executeIsolated timed out")
p.conn.invalidateOnTimeout("Pipeline executeIsolated timed out")
else:
ir = await executeIsolatedImpl(p, timeout)
return ir
Expand Down Expand Up @@ -3135,9 +3095,7 @@ proc openCursor(
)
.wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Cursor open timed out")
conn.invalidateOnTimeout("Cursor open timed out")
else:
result = await openCursorImpl(
conn, sql, params, paramOids, paramFormats, resultFormats, chunkSize
Expand Down Expand Up @@ -3228,9 +3186,7 @@ proc fetchNext*(cursor: Cursor): Future[seq[Row]] {.async.} =
try:
return await fetchNextImpl(cursor, cursor.timeout).wait(cursor.timeout)
except AsyncTimeoutError:
cursor.conn.cancelNoWait()
cursor.conn.state = csClosed
raise newException(PgTimeoutError, "Cursor fetch timed out")
cursor.conn.invalidateOnTimeout("Cursor fetch timed out")
else:
return await fetchNextImpl(cursor)

Expand Down Expand Up @@ -3276,9 +3232,7 @@ proc close*(cursor: Cursor): Future[void] {.async.} =
try:
await closeCursorImpl(cursor, cursor.timeout).wait(cursor.timeout)
except AsyncTimeoutError:
cursor.conn.cancelNoWait()
cursor.conn.state = csClosed
raise newException(PgTimeoutError, "Cursor close timed out")
cursor.conn.invalidateOnTimeout("Cursor close timed out")
else:
await closeCursorImpl(cursor)

Expand Down
26 changes: 20 additions & 6 deletions async_postgres/pg_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,24 @@ proc cancelNoWait*(conn: PgConnection) =

asyncSpawn doCancel()

proc invalidateOnTimeout*(conn: PgConnection, reason: string) =
## Timeout recovery for a connection whose last request may have left the
## protocol out of sync. Schedules a best-effort CancelRequest via
## `cancelNoWait`, marks the connection `csClosed` so it cannot be reused,
## and raises `PgTimeoutError` with `reason`.
##
## Under asyncdispatch this is the **only** safe recovery path: the inner
## future keeps running in the background after `wait()` fires, and may
## still write to the socket. Reusing the connection would interleave its
## stale write with a new request and corrupt the protocol stream. chronos
## cancels the inner future properly, but we still invalidate unconditionally
## — the server may have processed the request partially and the cached
## session state (prepared statements, portals, transaction status) is no
## longer reliable.
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, reason)

proc simpleExec*(
conn: PgConnection, sql: string, timeout: Duration = ZeroDuration
): Future[CommandResult] {.async.} =
Expand All @@ -1527,9 +1545,7 @@ proc simpleExec*(
try:
tag = await simpleExecImpl(conn, sql, timeout).wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "simpleExec timed out")
conn.invalidateOnTimeout("simpleExec timed out")
else:
tag = await simpleExecImpl(conn, sql)
return initCommandResult(tag)
Expand Down Expand Up @@ -1577,9 +1593,7 @@ proc ping*(conn: PgConnection, timeout = ZeroDuration): Future[void] =
try:
await perform().wait(timeout)
except AsyncTimeoutError:
conn.cancelNoWait()
conn.state = csClosed
raise newException(PgTimeoutError, "Ping timed out")
conn.invalidateOnTimeout("Ping timed out")

withTimeout()
else:
Expand Down
23 changes: 20 additions & 3 deletions async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,16 @@ proc metrics*(pool: PgPool): PoolMetrics =
pool.metrics

proc closeNoWait(pool: PgPool, conn: PgConnection) =
## Schedule connection close without waiting. For use in non-async contexts.
## The spawned task is tracked in `pool.pendingCloses` so `pool.close()` can
## await its completion for graceful shutdown.
## Schedule connection close without waiting. For use in non-async contexts
## (e.g. `release()` is synchronous). The spawned task is tracked in
## `pool.pendingCloses` so `pool.close()` can await its completion for
## graceful shutdown.
##
## Note on asyncdispatch: a close scheduled here may race with an inflight
## request future that the previous timeout could not cancel (see
## `invalidateOnTimeout`). That future will observe a closed fd and fail
## quietly — we rely on `asyncSpawn` swallowing the resulting CatchableError
## so it never surfaces. The connection is not reused either way.
pool.metrics.closeCount.inc
proc doClose() {.async.} =
try:
Expand Down Expand Up @@ -311,6 +318,16 @@ proc release*(pool: PgPool, conn: PgConnection) =
## Return a connection to the pool. If the connection is broken or in a
## transaction, it is closed instead. If waiters are queued, the connection
## is handed directly to the next waiter.
##
## Discard criteria (`conn.state != csReady`):
## - A timed-out request reaches us via `invalidateOnTimeout` with
## `state = csClosed`. Under asyncdispatch this is load-bearing: the
## inner future is still alive and may write to the socket, so the
## connection MUST be retired from the pool.
## - Any listening/replication/COPY state is also not reusable.
## Transaction-in-progress (`txStatus != tsIdle`) is treated as failure
## to reset the session, so the connection is closed rather than leaking
## transaction state to the next borrower.
var traceCtx: TraceContext
if pool.config.tracer != nil and pool.config.tracer.onPoolReleaseStart != nil:
traceCtx =
Expand Down
Loading