Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions async_postgres/pg_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ type
colOids*: seq[int32] ## Per-column type OIDs for RowData
lruNode*: DoublyLinkedNode[string] ## Embedded LRU list node

PgPoolOwner* = ref object of RootObj
## Opaque base for pool-ownership back-references on `PgConnection`.
## The concrete type is `PgPool` (defined in `pg_pool`); this base lives
## here to avoid a circular import. Consumers should not subclass this.

PgConnection* = ref object
## A single PostgreSQL connection with buffered I/O and statement caching.
when hasChronos:
Expand Down Expand Up @@ -198,6 +203,11 @@ type
hstoreOid: int32 ## Dynamic OID for hstore extension type; 0 if not available
hstoreArrayOid: int32 ## Dynamic OID for hstore[] array; 0 if not available
tracer: PgTracer ## Inherited from ConnConfig on connect
ownerPool*: PgPoolOwner
## Owning pool back-reference. Set when this connection is managed by
## a `PgPool` (or a pool inside `PgPoolCluster`); `nil` for standalone
## connections created via `connect`. Used by `release(conn)` to route
## the connection back to the correct pool.

QueryResult* = object
## Result of a query: field descriptions, row data, and command tag.
Expand Down
65 changes: 45 additions & 20 deletions async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type
execFut: Future[CommandResult] ## Non-nil for popExec
queryFut: Future[QueryResult] ## Non-nil for popQuery

PgPool* = ref object ## Connection pool that manages a set of PostgreSQL connections.
PgPool* = ref object of PgPoolOwner
## Connection pool that manages a set of PostgreSQL connections.
config: PoolConfig
idle: Deque[PooledConn]
active: int
Expand Down Expand Up @@ -274,6 +275,7 @@ proc maintenanceLoop(pool: PgPool) {.async.} =
break
try:
let conn = await connect(pool.config.connConfig).wait(replenishTimeout)
conn.ownerPool = pool
pool.metrics.createCount.inc
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: now))
except CatchableError:
Expand Down Expand Up @@ -301,6 +303,7 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} =
pool.cachedNow = Moment.now()
for i in 0 ..< cfg.minSize:
let conn = await connect(cfg.connConfig)
conn.ownerPool = pool
pool.metrics.createCount.inc
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow))
except CatchableError as e:
Expand All @@ -312,10 +315,11 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} =
pool.maintenanceTask = maintenanceLoop(pool)
return pool

proc release*(pool: PgPool, conn: PgConnection) =
## Return a connection to the pool. If the connection is broken or in a
## transaction, it is closed instead. If waiters are queued, the connection
## is handed directly to the next waiter.
proc releaseImpl(pool: PgPool, conn: PgConnection) =
## Implementation of `release(conn)`; called once the owning pool is known.
## Returns the connection to the pool. If the connection is broken or in
## a transaction, it is closed instead. If waiters are queued, the
## connection is handed directly to the next waiter.
##
## Discard criteria (`conn.state != csReady`):
## - A timed-out request reaches us via `invalidateOnTimeout` with
Expand Down Expand Up @@ -358,6 +362,26 @@ proc release*(pool: PgPool, conn: PgConnection) =
TracePoolReleaseEndData(wasClosed: wasClosed, handedToWaiter: handedToWaiter),
)

proc release*(conn: PgConnection) =
## Return a connection to its owning pool. If the connection is broken or
## in a transaction, it is closed instead; if waiters are queued, it is
## handed directly to the next waiter.
##
## The owning pool is tracked on `conn.ownerPool`, set automatically when
## the connection is acquired from a `PgPool` (including pools inside a
## `PgPoolCluster`). For standalone connections created with `connect`
## this field is `nil` and calling `release` raises `PgError` — use
## `conn.close()` instead.
##
## `withConnection`, `withReadConnection`, `withWriteConnection`,
## `withPipeline`, and `withTransaction` call this automatically; direct
## callers only need it when they manage `acquire`/`release` manually.
if conn.ownerPool == nil:
raise newException(
PgError, "release() called on a standalone connection; use conn.close() instead"
)
PgPool(conn.ownerPool).releaseImpl(conn)

type AcquireResult = tuple[conn: PgConnection, wasCreated: bool]

proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} =
Expand Down Expand Up @@ -402,6 +426,7 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} =
pool.active.inc
try:
let conn = await connect(pool.config.connConfig)
conn.ownerPool = pool
pool.metrics.createCount.inc
recordAcquire()
return (conn, true)
Expand Down Expand Up @@ -433,7 +458,7 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} =
# the future just before the timeout fired, return the connection
# to the pool instead of leaking it.
if fut.completed():
pool.release(fut.read())
fut.read().release()
raise newException(PgPoolError, "Pool acquire timeout")
else:
let conn = await fut
Expand Down Expand Up @@ -467,7 +492,7 @@ template withConnection*(pool: PgPool, conn, body: untyped) =
body
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc failPendingOp(op: PendingPoolOp, e: ref CatchableError) =
## Fail a pending op's future if not already finished.
Expand Down Expand Up @@ -531,7 +556,7 @@ proc executeBatch(
failPendingOp(op, e)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc dispatchBatchImpl(pool: PgPool) {.async.} =
## Drain the pending ops queue and execute them via pipelined connections.
Expand Down Expand Up @@ -564,7 +589,7 @@ proc dispatchBatchImpl(pool: PgPool) {.async.} =
op.queryFut.complete(r)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()
except CatchableError as e:
failPendingOp(op, e)
return
Expand Down Expand Up @@ -596,7 +621,7 @@ proc dispatchBatchImpl(pool: PgPool) {.async.} =
for ci in 0 ..< conns.len:
if connOps[ci].len == 0:
await pool.resetSession(conns[ci])
pool.release(conns[ci])
conns[ci].release()
continue
batchFuts.add(executeBatch(pool, conns[ci], connOps[ci]))

Expand Down Expand Up @@ -657,7 +682,7 @@ proc exec*(
return await conn.exec(sql, params, timeout = timeout)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc query*(
pool: PgPool,
Expand Down Expand Up @@ -688,7 +713,7 @@ proc query*(
return await conn.query(sql, params, resultFormat = resultFormat, timeout = timeout)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc queryEach*(
pool: PgPool,
Expand All @@ -708,7 +733,7 @@ proc queryEach*(
return await conn.queryEach(sql, params, callback, resultFormat, timeout)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc queryRowOpt*(
pool: PgPool,
Expand Down Expand Up @@ -873,7 +898,7 @@ proc simpleQuery*(pool: PgPool, sql: string): Future[seq[QueryResult]] {.async.}
return await conn.simpleQuery(sql)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc simpleExec*(
pool: PgPool, sql: string, timeout: Duration = ZeroDuration
Expand All @@ -885,7 +910,7 @@ proc simpleExec*(
return await conn.simpleExec(sql, timeout)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc execInTransaction*(
pool: PgPool,
Expand All @@ -899,7 +924,7 @@ proc execInTransaction*(
return await conn.execInTransaction(sql, params, timeout)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc queryInTransaction*(
pool: PgPool,
Expand All @@ -914,7 +939,7 @@ proc queryInTransaction*(
return await conn.queryInTransaction(sql, params, resultFormat, timeout)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc notify*(
pool: PgPool,
Expand All @@ -928,7 +953,7 @@ proc notify*(
await conn.notify(channel, payload, timeout)
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

macro withTransaction*(pool: PgPool, args: varargs[untyped]): untyped =
## Execute `body` inside a BEGIN/COMMIT transaction using a pooled connection.
Expand Down Expand Up @@ -999,7 +1024,7 @@ macro withTransaction*(pool: PgPool, args: varargs[untyped]): untyped =
raise `eSym`
finally:
await `resetSessionSym`(`poolSym`, `connIdent`)
`poolSym`.release(`connIdent`)
`connIdent`.release()

template withPipeline*(pool: PgPool, pipeline, body: untyped) =
## Acquire a connection, create a Pipeline, execute body, then release.
Expand All @@ -1010,7 +1035,7 @@ template withPipeline*(pool: PgPool, pipeline, body: untyped) =
body
finally:
await pool.resetSession(conn)
pool.release(conn)
conn.release()

proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} =
## Close the pool: stop the maintenance loop, cancel all waiters, and close
Expand Down
10 changes: 5 additions & 5 deletions async_postgres/pg_pool_cluster.nim
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ template withReadConnection*(cluster: PgPoolCluster, conn, body: untyped) =
body
finally:
await connPool.resetSession(conn)
connPool.release(conn)
conn.release()

template withWriteConnection*(cluster: PgPoolCluster, conn, body: untyped) =
## Acquire a write connection from the primary pool, execute `body`, then release.
Expand All @@ -122,7 +122,7 @@ template withWriteConnection*(cluster: PgPoolCluster, conn, body: untyped) =
body
finally:
await cluster.primary.resetSession(conn)
cluster.primary.release(conn)
conn.release()

# Macro to generate cluster forwarding procs from compact declarations.
# Each entry is a bodiless `proc` whose name starts with "read" or "write".
Expand Down Expand Up @@ -185,7 +185,7 @@ macro clusterForwards(mode: static[string], body: untyped): untyped =
ident"await",
newCall(newDotExpr(ident"pool", ident"resetSession"), ident"conn"),
),
newCall(newDotExpr(ident"pool", ident"release"), ident"conn"),
newCall(newDotExpr(ident"conn", ident"release")),
)
else:
let primary = newDotExpr(ident"cluster", ident"primary")
Expand All @@ -200,7 +200,7 @@ macro clusterForwards(mode: static[string], body: untyped): untyped =
ident"await",
newCall(newDotExpr(primary.copyNimTree(), ident"resetSession"), ident"conn"),
),
newCall(newDotExpr(primary.copyNimTree(), ident"release"), ident"conn"),
newCall(newDotExpr(ident"conn", ident"release")),
)

let tryFinally =
Expand Down Expand Up @@ -523,7 +523,7 @@ macro withTransaction*(cluster: PgPoolCluster, args: varargs[untyped]): untyped
raise `eSym`
finally:
await `resetSessionSym`(`clusterSym`.primary, `connIdent`)
`clusterSym`.primary.release(`connIdent`)
`connIdent`.release()

template withPipeline*(cluster: PgPoolCluster, pipeline, body: untyped) =
## Create a pipeline on the primary pool.
Expand Down
Loading