diff --git a/async_postgres/pg_connection.nim b/async_postgres/pg_connection.nim index 6ef2d0e..9802c65 100644 --- a/async_postgres/pg_connection.nim +++ b/async_postgres/pg_connection.nim @@ -276,6 +276,14 @@ type wasClosed*: bool ## true if connection was closed instead of returned to pool handedToWaiter*: bool ## true if connection was given directly to a waiting acquirer + TracePoolCloseErrorData* = object + ## Data passed to the pool close-error hook. Fired when a pool-initiated + ## `conn.close()` raises — these errors are otherwise swallowed because + ## close runs from non-async cleanup paths and fire-and-forget tasks, + ## making leaks hard to observe without tracing. + conn*: PgConnection + err*: ref CatchableError + PgTracer* = ref object ## Tracing hooks for async-postgres operations. ## Set only the callbacks you need; nil callbacks are skipped with zero overhead. @@ -319,6 +327,7 @@ type proc(data: TracePoolReleaseStartData): TraceContext {.gcsafe, raises: [].} onPoolReleaseEnd*: proc(ctx: TraceContext, data: TracePoolReleaseEndData) {.gcsafe, raises: [].} + onPoolCloseError*: proc(data: TracePoolCloseErrorData) {.gcsafe, raises: [].} # Public API: read-only getters diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index c5acaec..0fe7b4e 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -162,6 +162,21 @@ proc metrics*(pool: PgPool): PoolMetrics = ## Cumulative pool metrics. pool.metrics +proc reportCloseError(pool: PgPool, conn: PgConnection, err: ref CatchableError) = + ## Route a swallowed pool-initiated close error to the tracer. The pool + ## cannot propagate these errors to a caller (close runs from synchronous + ## cleanup paths and fire-and-forget tasks), so tracing is the only signal + ## operators have for leak detection. + if pool.config.tracer != nil and pool.config.tracer.onPoolCloseError != nil: + pool.config.tracer.onPoolCloseError(TracePoolCloseErrorData(conn: conn, err: err)) + +proc tracedClose(pool: PgPool, conn: PgConnection) {.async.} = + ## Close `conn`, reporting any close error via `reportCloseError`. + try: + await conn.close() + except CatchableError as e: + pool.reportCloseError(conn, e) + proc closeNoWait(pool: PgPool, conn: PgConnection) = ## Schedule connection close without waiting. For use in non-async contexts ## (e.g. `release()` is synchronous). The spawned task is tracked in @@ -171,14 +186,12 @@ proc closeNoWait(pool: PgPool, conn: PgConnection) = ## Note on asyncdispatch: a close scheduled here may race with an inflight ## request future that the previous timeout could not cancel (see ## `invalidateOnTimeout`). That future will observe a closed fd and fail - ## quietly — we rely on `asyncSpawn` swallowing the resulting CatchableError - ## so it never surfaces. The connection is not reused either way. + ## quietly — `tracedClose` catches the error and routes it to the + ## `onPoolCloseError` tracer hook (nil when unconfigured). The connection + ## is not reused either way. pool.metrics.closeCount.inc proc doClose() {.async.} = - try: - await conn.close() - except CatchableError: - discard + await pool.tracedClose(conn) # Prune only once the seq grows past the threshold so the sweep is amortized # instead of O(n) on every call. Uses swap-remove (constant-time delete that @@ -207,10 +220,7 @@ proc resetSession*(pool: PgPool, conn: PgConnection) {.async.} = discard await conn.simpleExec(pool.config.resetQuery) conn.clearStmtCache() except CatchableError: - try: - await conn.close() - except CatchableError: - discard + await pool.tracedClose(conn) proc maintenanceLoop(pool: PgPool) {.async.} = while not pool.closed: @@ -227,20 +237,14 @@ proc maintenanceLoop(pool: PgPool) {.async.} = # Always close broken or in-transaction connections (unusable) if pc.conn.state != csReady or pc.conn.txStatus != tsIdle: pool.metrics.closeCount.inc - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) continue # Always close max-lifetime-exceeded connections (acquire rejects them anyway) if pool.config.maxLifetime > ZeroDuration and now - pc.conn.createdAt > pool.config.maxLifetime: pool.metrics.closeCount.inc - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) continue # Idle timeout respects minSize @@ -249,10 +253,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} = let totalCount = remaining.len + pool.idle.len + pool.active if totalCount >= pool.config.minSize: pool.metrics.closeCount.inc - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) continue remaining.addLast(pc) @@ -305,10 +306,7 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} = except CatchableError as e: while pool.idle.len > 0: let pc = pool.idle.popFirst() - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) raise e pool.maintenanceTask = maintenanceLoop(pool) @@ -379,18 +377,12 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} = let pc = pool.idle.popFirst() if pc.conn.state != csReady: pool.metrics.closeCount.inc - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) continue if pool.config.maxLifetime > ZeroDuration and pool.cachedNow - pc.conn.createdAt > pool.config.maxLifetime: pool.metrics.closeCount.inc - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) continue # Health check: ping connections that have been idle too long if pool.config.healthCheckTimeout > ZeroDuration and @@ -399,10 +391,7 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} = await pc.conn.ping(pool.config.pingTimeout) except CatchableError: pool.metrics.closeCount.inc - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) continue pool.active.inc recordAcquire() @@ -1061,10 +1050,7 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} = while pool.idle.len > 0: let pc = pool.idle.popFirst() pool.metrics.closeCount.inc - try: - await pc.conn.close() - except CatchableError: - discard + await pool.tracedClose(pc.conn) # Wait for any fire-and-forget closes spawned via closeNoWait so the server # observes Terminate and fds are released before this proc returns. A late diff --git a/tests/test_tracing.nim b/tests/test_tracing.nim index 31d805b..9306e0c 100644 --- a/tests/test_tracing.nim +++ b/tests/test_tracing.nim @@ -110,6 +110,10 @@ type wasClosed: bool handedToWaiter: bool + PoolCloseErrorRec = object + hasConn: bool + errMsg: string + TraceLog = ref object connectStarts: seq[ConnectStartRec] connectEnds: seq[ConnectEndRec] @@ -125,6 +129,7 @@ type poolAcquireEnds: seq[PoolAcquireEndRec] poolReleaseStarts: seq[PoolReleaseStartRec] poolReleaseEnds: seq[PoolReleaseEndRec] + poolCloseErrors: seq[PoolCloseErrorRec] proc newTraceLog(): TraceLog = TraceLog() @@ -249,6 +254,13 @@ proc buildTracer(log: TraceLog): PgTracer = ) ) + tracer.onPoolCloseError = proc(data: TracePoolCloseErrorData) {.gcsafe, raises: [].} = + log.poolCloseErrors.add( + PoolCloseErrorRec( + hasConn: data.conn != nil, errMsg: (if data.err != nil: data.err.msg else: "") + ) + ) + return tracer proc tracedConfig(tracer: PgTracer): ConnConfig = @@ -637,6 +649,90 @@ suite "Tracing: pool acquire/release": waitFor t() +suite "Tracing: pool close errors": + test "onPoolCloseError reports swallowed close failures": + proc t() {.async.} = + let log = newTraceLog() + let tracer = buildTracer(log) + var poolCfg = initPoolConfig(tracedConfig(tracer), minSize = 0, maxSize = 1) + poolCfg.tracer = tracer + let pool = await newPool(poolCfg) + + let conn = await pool.acquire() + + # Drive the hook through reportCloseError — the single chokepoint that + # tracedClose funnels errors through. Inducing a real conn.close() failure + # is impractical in tests: close() is idempotent and its only raising path + # is inside sendMsg, which it already swallows internally. + let err = newException(PgError, "simulated close failure") + pool.reportCloseError(conn, err) + + doAssert log.poolCloseErrors.len == 1 + doAssert log.poolCloseErrors[0].hasConn + doAssert log.poolCloseErrors[0].errMsg == "simulated close failure" + + pool.release(conn) + await pool.close() + + waitFor t() + + test "nil onPoolCloseError hook is a no-op": + proc t() {.async.} = + # Build a tracer that sets OTHER hooks but leaves onPoolCloseError nil. + let tracer = PgTracer() + var poolCfg = initPoolConfig(plainConfig(), minSize = 0, maxSize = 1) + poolCfg.tracer = tracer + let pool = await newPool(poolCfg) + + let conn = await pool.acquire() + # Must not raise even though the hook is nil. + pool.reportCloseError(conn, newException(PgError, "ignored")) + + pool.release(conn) + await pool.close() + + waitFor t() + + test "tracedClose on healthy connection does not fire hook": + proc t() {.async.} = + let log = newTraceLog() + let tracer = buildTracer(log) + var poolCfg = initPoolConfig(tracedConfig(tracer), minSize = 0, maxSize = 1) + poolCfg.tracer = tracer + let pool = await newPool(poolCfg) + + # Use a stand-alone connection so pool.active/idle counters stay + # consistent — tracedClose only touches the tracer, not pool state. + let conn = await connect(tracedConfig(tracer)) + await pool.tracedClose(conn) + + doAssert log.poolCloseErrors.len == 0 + + await pool.close() + + waitFor t() + + test "tracedClose on already-closed connection is a no-op": + proc t() {.async.} = + let log = newTraceLog() + let tracer = buildTracer(log) + var poolCfg = initPoolConfig(tracedConfig(tracer), minSize = 0, maxSize = 1) + poolCfg.tracer = tracer + let pool = await newPool(poolCfg) + + # conn.close() is documented as idempotent. tracedClose against an + # already-closed conn exercises the real close path end-to-end and must + # neither raise nor fire the error hook. + let conn = await connect(tracedConfig(tracer)) + await conn.close() + await pool.tracedClose(conn) + + doAssert log.poolCloseErrors.len == 0 + + await pool.close() + + waitFor t() + suite "Tracing: queryEach": test "onQueryStart and onQueryEnd are called with rowCount": proc t() {.async.} =