Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 65 additions & 39 deletions async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,29 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} =
pool.maintenanceTask = maintenanceLoop(pool)
return pool

proc releaseCore(
pool: PgPool, conn: PgConnection
): tuple[wasClosed, handedToWaiter: bool] =
## Core release logic shared by the traced and non-traced paths of
## `releaseImpl`. Returns flags describing the disposition of `conn` so
## the caller can report them to the tracer.
if pool.closed or conn.state != csReady or conn.txStatus != tsIdle:
if pool.active > 0:
pool.active.dec
pool.closeNoWait(conn)
return (true, false)
while pool.waiters.len > 0:
let waiter = pool.waiters.popFirst()
if waiter.cancelled:
continue
pool.waiterCount.dec
waiter.fut.complete(conn)
return (false, true)
if pool.active > 0:
pool.active.dec
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow))
return (false, false)

proc releaseImpl(pool: PgPool, conn: PgConnection) =
## Implementation of `release(conn)`; called once the owning pool is known.
## Returns the connection to the pool. If the connection is broken or in
Expand All @@ -330,34 +353,19 @@ proc releaseImpl(pool: PgPool, conn: PgConnection) =
## Transaction-in-progress (`txStatus != tsIdle`) is treated as failure
## to reset the session, so the connection is closed rather than leaking
## transaction state to the next borrower.
let tracer = pool.config.tracer
if tracer == nil:
discard pool.releaseCore(conn)
return

var traceCtx: TraceContext
if pool.config.tracer != nil and pool.config.tracer.onPoolReleaseStart != nil:
traceCtx =
pool.config.tracer.onPoolReleaseStart(TracePoolReleaseStartData(conn: conn))
if tracer.onPoolReleaseStart != nil:
traceCtx = tracer.onPoolReleaseStart(TracePoolReleaseStartData(conn: conn))

var wasClosed = false
var handedToWaiter = false
if pool.closed or conn.state != csReady or conn.txStatus != tsIdle:
if pool.active > 0:
pool.active.dec
pool.closeNoWait(conn)
wasClosed = true
else:
block dispatch:
while pool.waiters.len > 0:
let waiter = pool.waiters.popFirst()
if waiter.cancelled:
continue
pool.waiterCount.dec
waiter.fut.complete(conn)
handedToWaiter = true
break dispatch
if pool.active > 0:
pool.active.dec
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow))
let (wasClosed, handedToWaiter) = pool.releaseCore(conn)

if pool.config.tracer != nil and pool.config.tracer.onPoolReleaseEnd != nil:
pool.config.tracer.onPoolReleaseEnd(
if tracer.onPoolReleaseEnd != nil:
tracer.onPoolReleaseEnd(
traceCtx,
TracePoolReleaseEndData(wasClosed: wasClosed, handedToWaiter: handedToWaiter),
)
Expand Down Expand Up @@ -469,6 +477,10 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} =
## Acquire a connection from the pool. Tries idle connections first (with
## health checks), creates a new one if under `maxSize`, or waits for a
## release. Raises `PgPoolError` on timeout or if the pool is closed.
if pool.config.tracer == nil:
let ar = await pool.acquireImpl()
return ar.conn

var ar: AcquireResult
withTracing(
pool.config.tracer,
Expand All @@ -491,7 +503,8 @@ template withConnection*(pool: PgPool, conn, body: untyped) =
try:
body
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc failPendingOp(op: PendingPoolOp, e: ref CatchableError) =
Expand Down Expand Up @@ -555,7 +568,8 @@ proc executeBatch(
for op in batch:
failPendingOp(op, e)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc dispatchBatchImpl(pool: PgPool) {.async.} =
Expand Down Expand Up @@ -588,7 +602,8 @@ proc dispatchBatchImpl(pool: PgPool) {.async.} =
)
op.queryFut.complete(r)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()
except CatchableError as e:
failPendingOp(op, e)
Expand Down Expand Up @@ -620,7 +635,8 @@ proc dispatchBatchImpl(pool: PgPool) {.async.} =
var batchFuts: seq[Future[void]]
for ci in 0 ..< conns.len:
if connOps[ci].len == 0:
await pool.resetSession(conns[ci])
if pool.config.resetQuery.len > 0:
await pool.resetSession(conns[ci])
conns[ci].release()
continue
batchFuts.add(executeBatch(pool, conns[ci], connOps[ci]))
Expand Down Expand Up @@ -681,7 +697,8 @@ proc exec*(
try:
return await conn.exec(sql, params, timeout = timeout)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc query*(
Expand Down Expand Up @@ -712,7 +729,8 @@ proc query*(
try:
return await conn.query(sql, params, resultFormat = resultFormat, timeout = timeout)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc queryEach*(
Expand All @@ -732,7 +750,8 @@ proc queryEach*(
try:
return await conn.queryEach(sql, params, callback, resultFormat, timeout)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc queryRowOpt*(
Expand Down Expand Up @@ -915,7 +934,8 @@ proc simpleQuery*(pool: PgPool, sql: string): Future[seq[QueryResult]] {.async.}
try:
return await conn.simpleQuery(sql)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc simpleExec*(
Expand All @@ -928,7 +948,8 @@ proc simpleExec*(
try:
return await conn.simpleExec(sql, timeout)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc execInTransaction*(
Expand All @@ -942,7 +963,8 @@ proc execInTransaction*(
try:
return await conn.execInTransaction(sql, params, timeout)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc queryInTransaction*(
Expand All @@ -957,7 +979,8 @@ proc queryInTransaction*(
try:
return await conn.queryInTransaction(sql, params, resultFormat, timeout)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc notify*(
Expand All @@ -971,7 +994,8 @@ proc notify*(
try:
await conn.notify(channel, payload, timeout)
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

macro withTransaction*(pool: PgPool, args: varargs[untyped]): untyped =
Expand Down Expand Up @@ -1042,7 +1066,8 @@ macro withTransaction*(pool: PgPool, args: varargs[untyped]): untyped =
discard
raise `eSym`
finally:
await `resetSessionSym`(`poolSym`, `connIdent`)
if `poolSym`.config.resetQuery.len > 0:
await `resetSessionSym`(`poolSym`, `connIdent`)
`connIdent`.release()

template withPipeline*(pool: PgPool, pipeline, body: untyped) =
Expand All @@ -1053,7 +1078,8 @@ template withPipeline*(pool: PgPool, pipeline, body: untyped) =
let pipeline = newPipeline(conn)
body
finally:
await pool.resetSession(conn)
if pool.config.resetQuery.len > 0:
await pool.resetSession(conn)
conn.release()

proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} =
Expand Down