Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions async_postgres/pg_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ type
wasClosed*: bool ## true if connection was closed instead of returned to pool
handedToWaiter*: bool ## true if connection was given directly to a waiting acquirer

TracePoolCloseErrorData* = object
## Data passed to the pool close-error hook. Fired when a pool-initiated
## `conn.close()` raises — these errors are otherwise swallowed because
## close runs from non-async cleanup paths and fire-and-forget tasks,
## making leaks hard to observe without tracing.
conn*: PgConnection
err*: ref CatchableError

PgTracer* = ref object
## Tracing hooks for async-postgres operations.
## Set only the callbacks you need; nil callbacks are skipped with zero overhead.
Expand Down Expand Up @@ -319,6 +327,7 @@ type
proc(data: TracePoolReleaseStartData): TraceContext {.gcsafe, raises: [].}
onPoolReleaseEnd*:
proc(ctx: TraceContext, data: TracePoolReleaseEndData) {.gcsafe, raises: [].}
onPoolCloseError*: proc(data: TracePoolCloseErrorData) {.gcsafe, raises: [].}

# Public API: read-only getters

Expand Down
70 changes: 28 additions & 42 deletions async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ proc metrics*(pool: PgPool): PoolMetrics =
## Cumulative pool metrics.
pool.metrics

proc reportCloseError(pool: PgPool, conn: PgConnection, err: ref CatchableError) =
## Route a swallowed pool-initiated close error to the tracer. The pool
## cannot propagate these errors to a caller (close runs from synchronous
## cleanup paths and fire-and-forget tasks), so tracing is the only signal
## operators have for leak detection.
if pool.config.tracer != nil and pool.config.tracer.onPoolCloseError != nil:
pool.config.tracer.onPoolCloseError(TracePoolCloseErrorData(conn: conn, err: err))

proc tracedClose(pool: PgPool, conn: PgConnection) {.async.} =
## Close `conn`, reporting any close error via `reportCloseError`.
try:
await conn.close()
except CatchableError as e:
pool.reportCloseError(conn, e)

proc closeNoWait(pool: PgPool, conn: PgConnection) =
## Schedule connection close without waiting. For use in non-async contexts
## (e.g. `release()` is synchronous). The spawned task is tracked in
Expand All @@ -171,14 +186,12 @@ proc closeNoWait(pool: PgPool, conn: PgConnection) =
## Note on asyncdispatch: a close scheduled here may race with an inflight
## request future that the previous timeout could not cancel (see
## `invalidateOnTimeout`). That future will observe a closed fd and fail
## quietly — we rely on `asyncSpawn` swallowing the resulting CatchableError
## so it never surfaces. The connection is not reused either way.
## quietly — `tracedClose` catches the error and routes it to the
## `onPoolCloseError` tracer hook (nil when unconfigured). The connection
## is not reused either way.
pool.metrics.closeCount.inc
proc doClose() {.async.} =
try:
await conn.close()
except CatchableError:
discard
await pool.tracedClose(conn)

# Prune only once the seq grows past the threshold so the sweep is amortized
# instead of O(n) on every call. Uses swap-remove (constant-time delete that
Expand Down Expand Up @@ -207,10 +220,7 @@ proc resetSession*(pool: PgPool, conn: PgConnection) {.async.} =
discard await conn.simpleExec(pool.config.resetQuery)
conn.clearStmtCache()
except CatchableError:
try:
await conn.close()
except CatchableError:
discard
await pool.tracedClose(conn)

proc maintenanceLoop(pool: PgPool) {.async.} =
while not pool.closed:
Expand All @@ -227,20 +237,14 @@ proc maintenanceLoop(pool: PgPool) {.async.} =
# Always close broken or in-transaction connections (unusable)
if pc.conn.state != csReady or pc.conn.txStatus != tsIdle:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)
continue

# Always close max-lifetime-exceeded connections (acquire rejects them anyway)
if pool.config.maxLifetime > ZeroDuration and
now - pc.conn.createdAt > pool.config.maxLifetime:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)
continue

# Idle timeout respects minSize
Expand All @@ -249,10 +253,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} =
let totalCount = remaining.len + pool.idle.len + pool.active
if totalCount >= pool.config.minSize:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)
continue

remaining.addLast(pc)
Expand Down Expand Up @@ -305,10 +306,7 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} =
except CatchableError as e:
while pool.idle.len > 0:
let pc = pool.idle.popFirst()
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)
raise e

pool.maintenanceTask = maintenanceLoop(pool)
Expand Down Expand Up @@ -379,18 +377,12 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} =
let pc = pool.idle.popFirst()
if pc.conn.state != csReady:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)
continue
if pool.config.maxLifetime > ZeroDuration and
pool.cachedNow - pc.conn.createdAt > pool.config.maxLifetime:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)
continue
# Health check: ping connections that have been idle too long
if pool.config.healthCheckTimeout > ZeroDuration and
Expand All @@ -399,10 +391,7 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} =
await pc.conn.ping(pool.config.pingTimeout)
except CatchableError:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)
continue
pool.active.inc
recordAcquire()
Expand Down Expand Up @@ -1061,10 +1050,7 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} =
while pool.idle.len > 0:
let pc = pool.idle.popFirst()
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
await pool.tracedClose(pc.conn)

# Wait for any fire-and-forget closes spawned via closeNoWait so the server
# observes Terminate and fds are released before this proc returns. A late
Expand Down
96 changes: 96 additions & 0 deletions tests/test_tracing.nim
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ type
wasClosed: bool
handedToWaiter: bool

PoolCloseErrorRec = object
hasConn: bool
errMsg: string

TraceLog = ref object
connectStarts: seq[ConnectStartRec]
connectEnds: seq[ConnectEndRec]
Expand All @@ -125,6 +129,7 @@ type
poolAcquireEnds: seq[PoolAcquireEndRec]
poolReleaseStarts: seq[PoolReleaseStartRec]
poolReleaseEnds: seq[PoolReleaseEndRec]
poolCloseErrors: seq[PoolCloseErrorRec]

proc newTraceLog(): TraceLog =
TraceLog()
Expand Down Expand Up @@ -249,6 +254,13 @@ proc buildTracer(log: TraceLog): PgTracer =
)
)

tracer.onPoolCloseError = proc(data: TracePoolCloseErrorData) {.gcsafe, raises: [].} =
log.poolCloseErrors.add(
PoolCloseErrorRec(
hasConn: data.conn != nil, errMsg: (if data.err != nil: data.err.msg else: "")
)
)

return tracer

proc tracedConfig(tracer: PgTracer): ConnConfig =
Expand Down Expand Up @@ -637,6 +649,90 @@ suite "Tracing: pool acquire/release":

waitFor t()

suite "Tracing: pool close errors":
test "onPoolCloseError reports swallowed close failures":
proc t() {.async.} =
let log = newTraceLog()
let tracer = buildTracer(log)
var poolCfg = initPoolConfig(tracedConfig(tracer), minSize = 0, maxSize = 1)
poolCfg.tracer = tracer
let pool = await newPool(poolCfg)

let conn = await pool.acquire()

# Drive the hook through reportCloseError — the single chokepoint that
# tracedClose funnels errors through. Inducing a real conn.close() failure
# is impractical in tests: close() is idempotent and its only raising path
# is inside sendMsg, which it already swallows internally.
let err = newException(PgError, "simulated close failure")
pool.reportCloseError(conn, err)

doAssert log.poolCloseErrors.len == 1
doAssert log.poolCloseErrors[0].hasConn
doAssert log.poolCloseErrors[0].errMsg == "simulated close failure"

pool.release(conn)
await pool.close()

waitFor t()

test "nil onPoolCloseError hook is a no-op":
proc t() {.async.} =
# Build a tracer that sets OTHER hooks but leaves onPoolCloseError nil.
let tracer = PgTracer()
var poolCfg = initPoolConfig(plainConfig(), minSize = 0, maxSize = 1)
poolCfg.tracer = tracer
let pool = await newPool(poolCfg)

let conn = await pool.acquire()
# Must not raise even though the hook is nil.
pool.reportCloseError(conn, newException(PgError, "ignored"))

pool.release(conn)
await pool.close()

waitFor t()

test "tracedClose on healthy connection does not fire hook":
proc t() {.async.} =
let log = newTraceLog()
let tracer = buildTracer(log)
var poolCfg = initPoolConfig(tracedConfig(tracer), minSize = 0, maxSize = 1)
poolCfg.tracer = tracer
let pool = await newPool(poolCfg)

# Use a stand-alone connection so pool.active/idle counters stay
# consistent — tracedClose only touches the tracer, not pool state.
let conn = await connect(tracedConfig(tracer))
await pool.tracedClose(conn)

doAssert log.poolCloseErrors.len == 0

await pool.close()

waitFor t()

test "tracedClose on already-closed connection is a no-op":
proc t() {.async.} =
let log = newTraceLog()
let tracer = buildTracer(log)
var poolCfg = initPoolConfig(tracedConfig(tracer), minSize = 0, maxSize = 1)
poolCfg.tracer = tracer
let pool = await newPool(poolCfg)

# conn.close() is documented as idempotent. tracedClose against an
# already-closed conn exercises the real close path end-to-end and must
# neither raise nor fire the error hook.
let conn = await connect(tracedConfig(tracer))
await conn.close()
await pool.tracedClose(conn)

doAssert log.poolCloseErrors.len == 0

await pool.close()

waitFor t()

suite "Tracing: queryEach":
test "onQueryStart and onQueryEnd are called with rowCount":
proc t() {.async.} =
Expand Down
Loading