diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index b8cb2d7..6b25f85 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -39,6 +39,13 @@ type fut: Future[PgConnection] cancelled: bool + PoolMetrics* = object ## Cumulative pool statistics. + acquireCount*: int64 ## Total successful acquires + acquireDuration*: Duration ## Total time spent waiting in acquire + timeoutCount*: int64 ## Number of acquire timeouts + createCount*: int64 ## Number of new connections created + closeCount*: int64 ## Number of connections closed/discarded + PgPool* = ref object ## Connection pool that manages a set of PostgreSQL connections. config: PoolConfig idle: Deque[PooledConn] @@ -49,6 +56,7 @@ type maintenanceTask: Future[void] cachedNow: Moment ## Updated on acquire(); reused by release() to avoid extra syscalls + metrics: PoolMetrics proc initPoolConfig*( connConfig: ConnConfig, @@ -117,8 +125,13 @@ proc isClosed*(pool: PgPool): bool = ## Whether the pool has been closed. pool.closed -proc closeNoWait(conn: PgConnection) = +proc metrics*(pool: PgPool): PoolMetrics = + ## Cumulative pool metrics. + pool.metrics + +proc closeNoWait(pool: PgPool, conn: PgConnection) = ## Schedule connection close without waiting. For use in non-async contexts. + pool.metrics.closeCount.inc proc doClose() {.async.} = try: await conn.close() @@ -156,6 +169,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} = # Always close broken or in-transaction connections (unusable) if pc.conn.state != csReady or pc.conn.txStatus != tsIdle: + pool.metrics.closeCount.inc try: await pc.conn.close() except CatchableError: @@ -165,6 +179,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} = # Always close max-lifetime-exceeded connections (acquire rejects them anyway) if pool.config.maxLifetime > ZeroDuration and now - pc.conn.createdAt > pool.config.maxLifetime: + pool.metrics.closeCount.inc try: await pc.conn.close() except CatchableError: @@ -176,6 +191,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} = now - pc.lastUsedAt > pool.config.idleTimeout: let totalCount = remaining.len + pool.idle.len + pool.active if totalCount >= pool.config.minSize: + pool.metrics.closeCount.inc try: await pc.conn.close() except CatchableError: @@ -200,6 +216,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} = break try: let conn = await connect(pool.config.connConfig).wait(replenishTimeout) + pool.metrics.createCount.inc pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: now)) except CatchableError: break # best-effort, retry next interval @@ -224,6 +241,7 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} = pool.cachedNow = Moment.now() for i in 0 ..< cfg.minSize: let conn = await connect(cfg.connConfig) + pool.metrics.createCount.inc pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow)) except CatchableError as e: while pool.idle.len > 0: @@ -244,7 +262,7 @@ proc release*(pool: PgPool, conn: PgConnection) = if pool.closed or conn.state != csReady or conn.txStatus != tsIdle: if pool.active > 0: pool.active.dec - conn.closeNoWait() + pool.closeNoWait(conn) return while pool.waiters.len > 0: @@ -266,11 +284,18 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} = raise newException(PgPoolError, "Pool is closed") pool.cachedNow = Moment.now() + let acquireStart = pool.cachedNow + + template recordAcquire() = + pool.metrics.acquireCount.inc + pool.metrics.acquireDuration = + pool.metrics.acquireDuration + (Moment.now() - acquireStart) # Try to get an idle connection while pool.idle.len > 0: let pc = pool.idle.popFirst() if pc.conn.state != csReady: + pool.metrics.closeCount.inc try: await pc.conn.close() except CatchableError: @@ -278,6 +303,7 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} = continue if pool.config.maxLifetime > ZeroDuration and pool.cachedNow - pc.conn.createdAt > pool.config.maxLifetime: + pool.metrics.closeCount.inc try: await pc.conn.close() except CatchableError: @@ -289,19 +315,24 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} = try: await pc.conn.ping(pool.config.pingTimeout) except CatchableError: + pool.metrics.closeCount.inc try: await pc.conn.close() except CatchableError: discard continue pool.active.inc + recordAcquire() return pc.conn # No idle connections; create new if under limit if pool.active < pool.config.maxSize: pool.active.inc try: - return await connect(pool.config.connConfig) + let conn = await connect(pool.config.connConfig) + pool.metrics.createCount.inc + recordAcquire() + return conn except CatchableError as e: pool.active.dec raise e @@ -318,10 +349,13 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} = pool.waiterCount.inc if pool.config.acquireTimeout > ZeroDuration: try: - return await fut.wait(pool.config.acquireTimeout) + let conn = await fut.wait(pool.config.acquireTimeout) + recordAcquire() + return conn except AsyncTimeoutError: waiter.cancelled = true pool.waiterCount.dec + pool.metrics.timeoutCount.inc # In single-threaded async, no preemption occurs between completed() # and read(), so this sequence is race-free. If release() completed # the future just before the timeout fired, return the connection @@ -330,7 +364,9 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} = pool.release(fut.read()) raise newException(PgPoolError, "Pool acquire timeout") else: - return await fut + let conn = await fut + recordAcquire() + return conn template withConnection*(pool: PgPool, conn, body: untyped) = ## Acquire a connection, execute `body`, then release it back to the pool. @@ -713,6 +749,7 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} = # Close all idle connections while pool.idle.len > 0: let pc = pool.idle.popFirst() + pool.metrics.closeCount.inc try: await pc.conn.close() except CatchableError: diff --git a/tests/test_pool.nim b/tests/test_pool.nim index b26a355..b185b4c 100644 --- a/tests/test_pool.nim +++ b/tests/test_pool.nim @@ -1065,3 +1065,151 @@ suite "Error type granularity": except PgError: caught = true check caught + +suite "Pool metrics": + test "initial metrics are zero": + let pool = makePool() + let m = pool.metrics + check m.acquireCount == 0 + check m.acquireDuration == ZeroDuration + check m.timeoutCount == 0 + check m.createCount == 0 + check m.closeCount == 0 + + test "acquire from idle increments acquireCount": + let pool = makePool() + let conn = mockConn() + pool.idle.addLast(conn.toPooled()) + discard waitFor pool.acquire() + check pool.metrics.acquireCount == 1 + + test "acquire tracks acquireDuration": + let pool = makePool() + let conn = mockConn() + pool.idle.addLast(conn.toPooled()) + discard waitFor pool.acquire() + check pool.metrics.acquireDuration >= ZeroDuration + + test "acquire skipping broken connections increments closeCount": + let pool = makePool() + let broken = mockConn(csClosed) + let good = mockConn() + pool.idle.addLast(broken.toPooled()) + pool.idle.addLast(good.toPooled()) + discard waitFor pool.acquire() + check pool.metrics.closeCount == 1 + check pool.metrics.acquireCount == 1 + + test "release broken connection increments closeCount": + let pool = makePool() + pool.active = 1 + let conn = mockConn(csClosed) + pool.release(conn) + check pool.metrics.closeCount == 1 + + test "release to closed pool increments closeCount": + let pool = makePool() + pool.active = 1 + pool.closed = true + let conn = mockConn() + pool.release(conn) + check pool.metrics.closeCount == 1 + + test "close draining idle connections increments closeCount": + let pool = makePool() + let conn1 = mockConn(csClosed) + let conn2 = mockConn(csClosed) + pool.idle.addLast(conn1.toPooled()) + pool.idle.addLast(conn2.toPooled()) + waitFor pool.close() + check pool.metrics.closeCount == 2 + + test "acquire timeout increments timeoutCount": + proc t() {.async.} = + let pool = makePool(maxSize = 1) + pool.config.acquireTimeout = milliseconds(50) + pool.active = 1 + + try: + discard await pool.acquire() + except PgError: + discard + + doAssert pool.metrics.timeoutCount == 1 + doAssert pool.metrics.acquireCount == 0 + + waitFor t() + + test "multiple acquires accumulate metrics": + let pool = makePool() + for i in 0 ..< 3: + let conn = mockConn() + pool.idle.addLast(conn.toPooled()) + discard waitFor pool.acquire() + pool.active.dec + pool.idle.addLast(conn.toPooled()) + check pool.metrics.acquireCount == 3 + + test "waiter transfer increments acquireCount": + let pool = makePool(maxSize = 1) + pool.active = 1 + + let acquireFut = pool.acquire() + check not acquireFut.finished + + let conn = mockConn() + pool.release(conn) + discard waitFor acquireFut + check pool.metrics.acquireCount == 1 + + test "acquire skipping maxLifetime-expired connections increments closeCount": + let pool = makePool() + pool.config.maxLifetime = seconds(1) + let expired = mockConn() + expired.createdAt = Moment.now() - seconds(5) + expired.state = csClosed + let good = mockConn() + pool.idle.addLast(expired.toPooled()) + pool.idle.addLast(good.toPooled()) + discard waitFor pool.acquire() + check pool.metrics.closeCount == 1 + check pool.metrics.acquireCount == 1 + + test "acquireDuration accumulates across multiple acquires": + let pool = makePool() + for i in 0 ..< 3: + let conn = mockConn() + pool.idle.addLast(conn.toPooled()) + discard waitFor pool.acquire() + pool.active.dec + check pool.metrics.acquireCount == 3 + check pool.metrics.acquireDuration >= ZeroDuration + + test "waiter transfer tracks acquireDuration": + let pool = makePool(maxSize = 1) + pool.active = 1 + + let acquireFut = pool.acquire() + check not acquireFut.finished + + let conn = mockConn() + pool.release(conn) + discard waitFor acquireFut + check pool.metrics.acquireCount == 1 + check pool.metrics.acquireDuration >= ZeroDuration + + test "acquire timeout does not increment createCount": + proc t() {.async.} = + let pool = makePool(maxSize = 1) + pool.config.acquireTimeout = milliseconds(50) + pool.active = 1 + + try: + discard await pool.acquire() + except PgError: + discard + + doAssert pool.metrics.createCount == 0 + doAssert pool.metrics.timeoutCount == 1 + + waitFor t()