Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,20 @@ proc failPendingOp(op: PendingPoolOp, e: ref CatchableError) =
if not op.queryFut.finished:
op.queryFut.fail(e)

proc failAllPending(pool: PgPool, e: ref CatchableError) {.raises: [].} =
## Fail every queued op with `e`. Marked `raises: []` so the compiler
## proves the loop cannot leak into an `asyncSpawn`ed caller — any future
## change to `failPendingOp` or the underlying `Future.fail` that could
## raise will be caught here at compile time. `Exception` is used (not
## `CatchableError`) because asyncdispatch's `Future.fail` has inferred
## effect `Exception` via its callback chain.
try:
while pool.pendingOps.len > 0:
let op = pool.pendingOps.popFirst()
failPendingOp(op, e)
except Exception:
discard

proc executeBatch(
pool: PgPool, conn: PgConnection, batch: seq[PendingPoolOp]
): Future[void] {.async.} =
Expand Down Expand Up @@ -611,9 +625,7 @@ proc scheduleDispatch(pool: PgPool) {.gcsafe, raises: [].} =
await pool.dispatchBatchImpl()
except CatchableError as e:
# Fail any ops still in the queue so their futures don't hang forever.
while pool.pendingOps.len > 0:
let op = pool.pendingOps.popFirst()
failPendingOp(op, e)
pool.failAllPending(e)
# Re-schedule if there are remaining ops
if pool.pendingOps.len > 0:
pool.scheduleDispatch()
Expand All @@ -625,12 +637,7 @@ proc scheduleDispatch(pool: PgPool) {.gcsafe, raises: [].} =
# asyncSpawn should not raise in practice, but the compiler cannot
# prove it. Fail any pending ops so their futures do not hang.
let err = newException(PgError, "Pipeline dispatch failed: " & e.msg)
try:
while p.pendingOps.len > 0:
let op = p.pendingOps.popFirst()
failPendingOp(op, err)
except Exception:
discard
p.failAllPending(err)
p.dispatchScheduled = false

try:
Expand Down
Loading