diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index 7efe184..c5d7cd9 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -315,6 +315,29 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} = pool.maintenanceTask = maintenanceLoop(pool) return pool +proc releaseCore( + pool: PgPool, conn: PgConnection +): tuple[wasClosed, handedToWaiter: bool] = + ## Core release logic shared by the traced and non-traced paths of + ## `releaseImpl`. Returns flags describing the disposition of `conn` so + ## the caller can report them to the tracer. + if pool.closed or conn.state != csReady or conn.txStatus != tsIdle: + if pool.active > 0: + pool.active.dec + pool.closeNoWait(conn) + return (true, false) + while pool.waiters.len > 0: + let waiter = pool.waiters.popFirst() + if waiter.cancelled: + continue + pool.waiterCount.dec + waiter.fut.complete(conn) + return (false, true) + if pool.active > 0: + pool.active.dec + pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow)) + return (false, false) + proc releaseImpl(pool: PgPool, conn: PgConnection) = ## Implementation of `release(conn)`; called once the owning pool is known. ## Returns the connection to the pool. If the connection is broken or in @@ -330,34 +353,19 @@ proc releaseImpl(pool: PgPool, conn: PgConnection) = ## Transaction-in-progress (`txStatus != tsIdle`) is treated as failure ## to reset the session, so the connection is closed rather than leaking ## transaction state to the next borrower. + let tracer = pool.config.tracer + if tracer == nil: + discard pool.releaseCore(conn) + return + var traceCtx: TraceContext - if pool.config.tracer != nil and pool.config.tracer.onPoolReleaseStart != nil: - traceCtx = - pool.config.tracer.onPoolReleaseStart(TracePoolReleaseStartData(conn: conn)) + if tracer.onPoolReleaseStart != nil: + traceCtx = tracer.onPoolReleaseStart(TracePoolReleaseStartData(conn: conn)) - var wasClosed = false - var handedToWaiter = false - if pool.closed or conn.state != csReady or conn.txStatus != tsIdle: - if pool.active > 0: - pool.active.dec - pool.closeNoWait(conn) - wasClosed = true - else: - block dispatch: - while pool.waiters.len > 0: - let waiter = pool.waiters.popFirst() - if waiter.cancelled: - continue - pool.waiterCount.dec - waiter.fut.complete(conn) - handedToWaiter = true - break dispatch - if pool.active > 0: - pool.active.dec - pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: pool.cachedNow)) + let (wasClosed, handedToWaiter) = pool.releaseCore(conn) - if pool.config.tracer != nil and pool.config.tracer.onPoolReleaseEnd != nil: - pool.config.tracer.onPoolReleaseEnd( + if tracer.onPoolReleaseEnd != nil: + tracer.onPoolReleaseEnd( traceCtx, TracePoolReleaseEndData(wasClosed: wasClosed, handedToWaiter: handedToWaiter), ) @@ -469,6 +477,10 @@ proc acquire*(pool: PgPool): Future[PgConnection] {.async.} = ## Acquire a connection from the pool. Tries idle connections first (with ## health checks), creates a new one if under `maxSize`, or waits for a ## release. Raises `PgPoolError` on timeout or if the pool is closed. + if pool.config.tracer == nil: + let ar = await pool.acquireImpl() + return ar.conn + var ar: AcquireResult withTracing( pool.config.tracer, @@ -491,7 +503,8 @@ template withConnection*(pool: PgPool, conn, body: untyped) = try: body finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc failPendingOp(op: PendingPoolOp, e: ref CatchableError) = @@ -555,7 +568,8 @@ proc executeBatch( for op in batch: failPendingOp(op, e) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc dispatchBatchImpl(pool: PgPool) {.async.} = @@ -588,7 +602,8 @@ proc dispatchBatchImpl(pool: PgPool) {.async.} = ) op.queryFut.complete(r) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() except CatchableError as e: failPendingOp(op, e) @@ -620,7 +635,8 @@ proc dispatchBatchImpl(pool: PgPool) {.async.} = var batchFuts: seq[Future[void]] for ci in 0 ..< conns.len: if connOps[ci].len == 0: - await pool.resetSession(conns[ci]) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conns[ci]) conns[ci].release() continue batchFuts.add(executeBatch(pool, conns[ci], connOps[ci])) @@ -681,7 +697,8 @@ proc exec*( try: return await conn.exec(sql, params, timeout = timeout) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc query*( @@ -712,7 +729,8 @@ proc query*( try: return await conn.query(sql, params, resultFormat = resultFormat, timeout = timeout) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc queryEach*( @@ -732,7 +750,8 @@ proc queryEach*( try: return await conn.queryEach(sql, params, callback, resultFormat, timeout) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc queryRowOpt*( @@ -915,7 +934,8 @@ proc simpleQuery*(pool: PgPool, sql: string): Future[seq[QueryResult]] {.async.} try: return await conn.simpleQuery(sql) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc simpleExec*( @@ -928,7 +948,8 @@ proc simpleExec*( try: return await conn.simpleExec(sql, timeout) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc execInTransaction*( @@ -942,7 +963,8 @@ proc execInTransaction*( try: return await conn.execInTransaction(sql, params, timeout) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc queryInTransaction*( @@ -957,7 +979,8 @@ proc queryInTransaction*( try: return await conn.queryInTransaction(sql, params, resultFormat, timeout) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc notify*( @@ -971,7 +994,8 @@ proc notify*( try: await conn.notify(channel, payload, timeout) finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() macro withTransaction*(pool: PgPool, args: varargs[untyped]): untyped = @@ -1042,7 +1066,8 @@ macro withTransaction*(pool: PgPool, args: varargs[untyped]): untyped = discard raise `eSym` finally: - await `resetSessionSym`(`poolSym`, `connIdent`) + if `poolSym`.config.resetQuery.len > 0: + await `resetSessionSym`(`poolSym`, `connIdent`) `connIdent`.release() template withPipeline*(pool: PgPool, pipeline, body: untyped) = @@ -1053,7 +1078,8 @@ template withPipeline*(pool: PgPool, pipeline, body: untyped) = let pipeline = newPipeline(conn) body finally: - await pool.resetSession(conn) + if pool.config.resetQuery.len > 0: + await pool.resetSession(conn) conn.release() proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} =