Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import std/[deques, macros, options]

import async_backend, pg_protocol, pg_connection, pg_types, pg_client

const closePruneThreshold = 16
## Sweep `pool.pendingCloses` for finished futures only once its length
## reaches this threshold, so the O(n) prune is amortized across calls.

type
PoolConfig* = object
## Configuration for the connection pool. Create via `initPoolConfig`.
Expand Down Expand Up @@ -79,6 +83,8 @@ type
metrics: PoolMetrics
pendingOps: Deque[PendingPoolOp] ## Queue for implicit pipeline batching
dispatchScheduled: bool ## Whether a dispatch callback is pending
pendingCloses: seq[Future[void]]
## Fire-and-forget close tasks spawned by closeNoWait, awaited on pool.close()

proc initPoolConfig*(
connConfig: ConnConfig,
Expand Down Expand Up @@ -158,14 +164,32 @@ proc metrics*(pool: PgPool): PoolMetrics =

proc closeNoWait(pool: PgPool, conn: PgConnection) =
## Schedule connection close without waiting. For use in non-async contexts.
## The spawned task is tracked in `pool.pendingCloses` so `pool.close()` can
## await its completion for graceful shutdown.
pool.metrics.closeCount.inc
proc doClose() {.async.} =
try:
await conn.close()
except CatchableError:
discard

asyncSpawn doClose()
# Prune only once the seq grows past the threshold so the sweep is amortized
# instead of O(n) on every call. Uses swap-remove (constant-time delete that
# reorders) since order among pending closes is irrelevant.
if pool.pendingCloses.len >= closePruneThreshold:
var n = pool.pendingCloses.len
var i = 0
while i < n:
if pool.pendingCloses[i].finished:
pool.pendingCloses[i] = pool.pendingCloses[n - 1]
dec n
else:
inc i
pool.pendingCloses.setLen(n)

let fut = doClose()
pool.pendingCloses.add(fut)
asyncSpawn fut

proc resetSession*(pool: PgPool, conn: PgConnection) {.async.} =
## Execute the configured reset query on a connection before returning it
Expand Down Expand Up @@ -1017,3 +1041,12 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} =
await pc.conn.close()
except CatchableError:
discard

# Wait for any fire-and-forget closes spawned via closeNoWait so the server
# observes Terminate and fds are released before this proc returns. A late
# release() from another task may push more entries while we await, so loop
# with snapshot-and-clear to avoid discarding unfinished futures.
while pool.pendingCloses.len > 0:
let pending = pool.pendingCloses
pool.pendingCloses.setLen(0)
await allFutures(pending)
31 changes: 31 additions & 0 deletions tests/test_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,37 @@ suite "Pool close":
check pool.closed
check pool.active == 1

test "release of broken conn tracks pending close":
let pool = makePool()
pool.active = 1
let conn = mockConn(csClosed)
pool.release(conn)
check pool.pendingCloses.len == 1

test "close awaits pending closeNoWait tasks":
let pool = makePool()
pool.active = 2
pool.release(mockConn(csClosed))
pool.release(mockConn(csClosed))

waitFor pool.close()
check pool.closed
check pool.pendingCloses.len == 0

test "closeNoWait prunes finished futures once threshold is reached":
let pool = makePool()
# Inject pre-finished dummies up to the prune threshold so the next
# closeNoWait deterministically triggers the sweep regardless of timing.
for _ in 0 ..< closePruneThreshold:
let f = newFuture[void]("dummy")
f.complete()
pool.pendingCloses.add(f)
pool.active = 1
pool.release(mockConn(csClosed))
# All finished dummies were swept, leaving only the newly spawned close.
check pool.pendingCloses.len == 1
waitFor pool.close()

suite "Pool active count tracking":
test "release then acquire roundtrip":
let pool = makePool()
Expand Down
Loading