Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type
fut: Future[PgConnection]
cancelled: bool

PoolMetrics* = object ## Cumulative pool statistics.
acquireCount*: int64 ## Total successful acquires
acquireDuration*: Duration ## Total time spent waiting in acquire
timeoutCount*: int64 ## Number of acquire timeouts
createCount*: int64 ## Number of new connections created
closeCount*: int64 ## Number of connections closed/discarded

PgPool* = ref object ## Connection pool that manages a set of PostgreSQL connections.
config: PoolConfig
idle: Deque[PooledConn]
Expand All @@ -49,6 +56,7 @@ type
maintenanceTask: Future[void]
cachedNow: Moment
## Updated on acquire(); reused by release() to avoid extra syscalls
metrics: PoolMetrics

proc initPoolConfig*(
connConfig: ConnConfig,
Expand Down Expand Up @@ -117,8 +125,13 @@ proc isClosed*(pool: PgPool): bool =
## Whether the pool has been closed.
pool.closed

proc closeNoWait(conn: PgConnection) =
proc metrics*(pool: PgPool): PoolMetrics =
## Cumulative pool metrics.
pool.metrics

proc closeNoWait(pool: PgPool, conn: PgConnection) =
## Schedule connection close without waiting. For use in non-async contexts.
pool.metrics.closeCount.inc
proc doClose() {.async.} =
try:
await conn.close()
Expand Down Expand Up @@ -156,6 +169,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} =

# Always close broken or in-transaction connections (unusable)
if pc.conn.state != csReady or pc.conn.txStatus != tsIdle:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
Expand All @@ -165,6 +179,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} =
# Always close max-lifetime-exceeded connections (acquire rejects them anyway)
if pool.config.maxLifetime > ZeroDuration and
now - pc.conn.createdAt > pool.config.maxLifetime:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
Expand All @@ -176,6 +191,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} =
now - pc.lastUsedAt > pool.config.idleTimeout:
let totalCount = remaining.len + pool.idle.len + pool.active
if totalCount >= pool.config.minSize:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
Expand All @@ -200,6 +216,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} =
break
try:
let conn = await connect(pool.config.connConfig).wait(replenishTimeout)
pool.metrics.createCount.inc
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: now))
except CatchableError:
break # best-effort, retry next interval
Expand All @@ -224,6 +241,7 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} =
pool.cachedNow = Moment.now()
for i in 0 ..< cfg.minSize:
let conn = await connect(cfg.connConfig)
pool.metrics.createCount.inc
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow))
except CatchableError as e:
while pool.idle.len > 0:
Expand All @@ -244,7 +262,7 @@ proc release*(pool: PgPool, conn: PgConnection) =
if pool.closed or conn.state != csReady or conn.txStatus != tsIdle:
if pool.active > 0:
pool.active.dec
conn.closeNoWait()
pool.closeNoWait(conn)
return

while pool.waiters.len > 0:
Expand All @@ -266,18 +284,26 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} =
raise newException(PgPoolError, "Pool is closed")

pool.cachedNow = Moment.now()
let acquireStart = pool.cachedNow

template recordAcquire() =
pool.metrics.acquireCount.inc
pool.metrics.acquireDuration =
pool.metrics.acquireDuration + (Moment.now() - acquireStart)

# Try to get an idle connection
while pool.idle.len > 0:
let pc = pool.idle.popFirst()
if pc.conn.state != csReady:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
continue
if pool.config.maxLifetime > ZeroDuration and
pool.cachedNow - pc.conn.createdAt > pool.config.maxLifetime:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
Expand All @@ -289,19 +315,24 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} =
try:
await pc.conn.ping(pool.config.pingTimeout)
except CatchableError:
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
discard
continue
pool.active.inc
recordAcquire()
return pc.conn

# No idle connections; create new if under limit
if pool.active < pool.config.maxSize:
pool.active.inc
try:
return await connect(pool.config.connConfig)
let conn = await connect(pool.config.connConfig)
pool.metrics.createCount.inc
recordAcquire()
return conn
except CatchableError as e:
pool.active.dec
raise e
Expand All @@ -318,10 +349,13 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} =
pool.waiterCount.inc
if pool.config.acquireTimeout > ZeroDuration:
try:
return await fut.wait(pool.config.acquireTimeout)
let conn = await fut.wait(pool.config.acquireTimeout)
recordAcquire()
return conn
except AsyncTimeoutError:
waiter.cancelled = true
pool.waiterCount.dec
pool.metrics.timeoutCount.inc
# In single-threaded async, no preemption occurs between completed()
# and read(), so this sequence is race-free. If release() completed
# the future just before the timeout fired, return the connection
Expand All @@ -330,7 +364,9 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} =
pool.release(fut.read())
raise newException(PgPoolError, "Pool acquire timeout")
else:
return await fut
let conn = await fut
recordAcquire()
return conn

template withConnection*(pool: PgPool, conn, body: untyped) =
## Acquire a connection, execute `body`, then release it back to the pool.
Expand Down Expand Up @@ -713,6 +749,7 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} =
# Close all idle connections
while pool.idle.len > 0:
let pc = pool.idle.popFirst()
pool.metrics.closeCount.inc
try:
await pc.conn.close()
except CatchableError:
Expand Down
148 changes: 148 additions & 0 deletions tests/test_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,151 @@ suite "Error type granularity":
except PgError:
caught = true
check caught

suite "Pool metrics":
test "initial metrics are zero":
let pool = makePool()
let m = pool.metrics
check m.acquireCount == 0
check m.acquireDuration == ZeroDuration
check m.timeoutCount == 0
check m.createCount == 0
check m.closeCount == 0

test "acquire from idle increments acquireCount":
let pool = makePool()
let conn = mockConn()
pool.idle.addLast(conn.toPooled())
discard waitFor pool.acquire()
check pool.metrics.acquireCount == 1

test "acquire tracks acquireDuration":
let pool = makePool()
let conn = mockConn()
pool.idle.addLast(conn.toPooled())
discard waitFor pool.acquire()
check pool.metrics.acquireDuration >= ZeroDuration

test "acquire skipping broken connections increments closeCount":
let pool = makePool()
let broken = mockConn(csClosed)
let good = mockConn()
pool.idle.addLast(broken.toPooled())
pool.idle.addLast(good.toPooled())
discard waitFor pool.acquire()
check pool.metrics.closeCount == 1
check pool.metrics.acquireCount == 1

test "release broken connection increments closeCount":
let pool = makePool()
pool.active = 1
let conn = mockConn(csClosed)
pool.release(conn)
check pool.metrics.closeCount == 1

test "release to closed pool increments closeCount":
let pool = makePool()
pool.active = 1
pool.closed = true
let conn = mockConn()
pool.release(conn)
check pool.metrics.closeCount == 1

test "close draining idle connections increments closeCount":
let pool = makePool()
let conn1 = mockConn(csClosed)
let conn2 = mockConn(csClosed)
pool.idle.addLast(conn1.toPooled())
pool.idle.addLast(conn2.toPooled())
waitFor pool.close()
check pool.metrics.closeCount == 2

test "acquire timeout increments timeoutCount":
proc t() {.async.} =
let pool = makePool(maxSize = 1)
pool.config.acquireTimeout = milliseconds(50)
pool.active = 1

try:
discard await pool.acquire()
except PgError:
discard

doAssert pool.metrics.timeoutCount == 1
doAssert pool.metrics.acquireCount == 0

waitFor t()

test "multiple acquires accumulate metrics":
let pool = makePool()
for i in 0 ..< 3:
let conn = mockConn()
pool.idle.addLast(conn.toPooled())
discard waitFor pool.acquire()
pool.active.dec
pool.idle.addLast(conn.toPooled())
check pool.metrics.acquireCount == 3

test "waiter transfer increments acquireCount":
let pool = makePool(maxSize = 1)
pool.active = 1

let acquireFut = pool.acquire()
check not acquireFut.finished

let conn = mockConn()
pool.release(conn)
discard waitFor acquireFut
check pool.metrics.acquireCount == 1

test "acquire skipping maxLifetime-expired connections increments closeCount":
let pool = makePool()
pool.config.maxLifetime = seconds(1)
let expired = mockConn()
expired.createdAt = Moment.now() - seconds(5)
expired.state = csClosed
let good = mockConn()
pool.idle.addLast(expired.toPooled())
pool.idle.addLast(good.toPooled())
discard waitFor pool.acquire()
check pool.metrics.closeCount == 1
check pool.metrics.acquireCount == 1

test "acquireDuration accumulates across multiple acquires":
let pool = makePool()
for i in 0 ..< 3:
let conn = mockConn()
pool.idle.addLast(conn.toPooled())
discard waitFor pool.acquire()
pool.active.dec
check pool.metrics.acquireCount == 3
check pool.metrics.acquireDuration >= ZeroDuration

test "waiter transfer tracks acquireDuration":
let pool = makePool(maxSize = 1)
pool.active = 1

let acquireFut = pool.acquire()
check not acquireFut.finished

let conn = mockConn()
pool.release(conn)
discard waitFor acquireFut
check pool.metrics.acquireCount == 1
check pool.metrics.acquireDuration >= ZeroDuration

test "acquire timeout does not increment createCount":
proc t() {.async.} =
let pool = makePool(maxSize = 1)
pool.config.acquireTimeout = milliseconds(50)
pool.active = 1

try:
discard await pool.acquire()
except PgError:
discard

doAssert pool.metrics.createCount == 0
doAssert pool.metrics.timeoutCount == 1

waitFor t()
Loading