diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index 0ef9107..1ec2342 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -2,6 +2,10 @@ import std/[deques, macros, options] import async_backend, pg_protocol, pg_connection, pg_types, pg_client +const closePruneThreshold = 16 + ## Sweep `pool.pendingCloses` for finished futures only once its length + ## reaches this threshold, so the O(n) prune is amortized across calls. + type PoolConfig* = object ## Configuration for the connection pool. Create via `initPoolConfig`. @@ -79,6 +83,8 @@ type metrics: PoolMetrics pendingOps: Deque[PendingPoolOp] ## Queue for implicit pipeline batching dispatchScheduled: bool ## Whether a dispatch callback is pending + pendingCloses: seq[Future[void]] + ## Fire-and-forget close tasks spawned by closeNoWait, awaited on pool.close() proc initPoolConfig*( connConfig: ConnConfig, @@ -158,6 +164,8 @@ proc metrics*(pool: PgPool): PoolMetrics = proc closeNoWait(pool: PgPool, conn: PgConnection) = ## Schedule connection close without waiting. For use in non-async contexts. + ## The spawned task is tracked in `pool.pendingCloses` so `pool.close()` can + ## await its completion for graceful shutdown. pool.metrics.closeCount.inc proc doClose() {.async.} = try: @@ -165,7 +173,23 @@ proc closeNoWait(pool: PgPool, conn: PgConnection) = except CatchableError: discard - asyncSpawn doClose() + # Prune only once the seq grows past the threshold so the sweep is amortized + # instead of O(n) on every call. Uses swap-remove (constant-time delete that + # reorders) since order among pending closes is irrelevant. + if pool.pendingCloses.len >= closePruneThreshold: + var n = pool.pendingCloses.len + var i = 0 + while i < n: + if pool.pendingCloses[i].finished: + pool.pendingCloses[i] = pool.pendingCloses[n - 1] + dec n + else: + inc i + pool.pendingCloses.setLen(n) + + let fut = doClose() + pool.pendingCloses.add(fut) + asyncSpawn fut proc resetSession*(pool: PgPool, conn: PgConnection) {.async.} = ## Execute the configured reset query on a connection before returning it @@ -1017,3 +1041,12 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} = await pc.conn.close() except CatchableError: discard + + # Wait for any fire-and-forget closes spawned via closeNoWait so the server + # observes Terminate and fds are released before this proc returns. A late + # release() from another task may push more entries while we await, so loop + # with snapshot-and-clear to avoid discarding unfinished futures. + while pool.pendingCloses.len > 0: + let pending = pool.pendingCloses + pool.pendingCloses.setLen(0) + await allFutures(pending) diff --git a/tests/test_pool.nim b/tests/test_pool.nim index cdf4b0a..2775cd8 100644 --- a/tests/test_pool.nim +++ b/tests/test_pool.nim @@ -410,6 +410,37 @@ suite "Pool close": check pool.closed check pool.active == 1 + test "release of broken conn tracks pending close": + let pool = makePool() + pool.active = 1 + let conn = mockConn(csClosed) + pool.release(conn) + check pool.pendingCloses.len == 1 + + test "close awaits pending closeNoWait tasks": + let pool = makePool() + pool.active = 2 + pool.release(mockConn(csClosed)) + pool.release(mockConn(csClosed)) + + waitFor pool.close() + check pool.closed + check pool.pendingCloses.len == 0 + + test "closeNoWait prunes finished futures once threshold is reached": + let pool = makePool() + # Inject pre-finished dummies up to the prune threshold so the next + # closeNoWait deterministically triggers the sweep regardless of timing. + for _ in 0 ..< closePruneThreshold: + let f = newFuture[void]("dummy") + f.complete() + pool.pendingCloses.add(f) + pool.active = 1 + pool.release(mockConn(csClosed)) + # All finished dummies were swept, leaving only the newly spawned close. + check pool.pendingCloses.len == 1 + waitFor pool.close() + suite "Pool active count tracking": test "release then acquire roundtrip": let pool = makePool()