diff --git a/async_postgres/pg_client.nim b/async_postgres/pg_client.nim index f36e645..644d768 100644 --- a/async_postgres/pg_client.nim +++ b/async_postgres/pg_client.nim @@ -92,6 +92,11 @@ type conn: PgConnection ops: seq[PipelineOp] + IsolatedPipelineResults* = object + ## Results from `executeIsolated`: per-op error isolation via per-query SYNC. + results*: seq[PipelineResult] + errors*: seq[ref CatchableError] ## errors[i] is nil if ops[i] succeeded + proc buildBeginSql*(opts: TransactionOptions): string = ## Build a BEGIN SQL statement with the specified transaction options ## (isolation level, access mode, deferrable mode). @@ -2392,6 +2397,208 @@ proc execute*( results = await executeImpl(p, timeout) return results +proc executeIsolatedImpl( + p: Pipeline, timeout: Duration = ZeroDuration +): Future[IsolatedPipelineResults] {.async.} = + ## Execute pipeline ops with per-query SYNC for error isolation. + ## Each op gets its own ReadyForQuery; a failed op does not abort others. + let conn = p.conn + conn.checkReady() + conn.state = csBusy + + # Send Phase (same as executeImpl but SYNC per op) + conn.sendBuf.setLen(0) + var cachedStmts: seq[CachedStmt] + var hasCachedStmts = false + var pendingCacheAdds = 0 + var defaultFormats: seq[int16] + + for i in 0 ..< p.ops.len: + let formats = + if p.ops[i].paramFormats.len > 0: + p.ops[i].paramFormats + else: + let needed = p.ops[i].params.len + if defaultFormats.len != needed: + defaultFormats = newSeq[int16](needed) + defaultFormats + + let cached = conn.lookupStmtCache(p.ops[i].sql) + p.ops[i].cacheHit = cached != nil + p.ops[i].cacheMiss = false + + if cached != nil: + p.ops[i].stmtName = cached.name + if p.ops[i].kind == pokQuery: + if not hasCachedStmts: + cachedStmts = newSeq[CachedStmt](p.ops.len) + hasCachedStmts = true + cachedStmts[i] = cached[] + var effectiveResultFormats: seq[int16] + if p.ops[i].kind == pokQuery: + effectiveResultFormats = + if p.ops[i].resultFormats.len == 0: + cached.resultFormats + else: + p.ops[i].resultFormats + p.ops[i].resultFormats = effectiveResultFormats + conn.sendBuf.addBind( + "", cached.name, formats, p.ops[i].params, effectiveResultFormats + ) + conn.sendBuf.addExecute("", 0) + elif conn.stmtCacheCapacity > 0: + p.ops[i].cacheMiss = true + p.ops[i].stmtName = conn.nextStmtName() + if conn.stmtCache.len + pendingCacheAdds >= conn.stmtCacheCapacity and + conn.stmtCache.len > 0: + let evicted = conn.evictStmtCache() + conn.sendBuf.addClose(dkStatement, evicted.name) + inc pendingCacheAdds + conn.sendBuf.addParse(p.ops[i].stmtName, p.ops[i].sql, p.ops[i].paramOids) + conn.sendBuf.addDescribe(dkStatement, p.ops[i].stmtName) + conn.sendBuf.addBind( + "", p.ops[i].stmtName, formats, p.ops[i].params, p.ops[i].resultFormats + ) + conn.sendBuf.addExecute("", 0) + else: + conn.sendBuf.addParse("", p.ops[i].sql, p.ops[i].paramOids) + conn.sendBuf.addBind("", "", formats, p.ops[i].params, p.ops[i].resultFormats) + if p.ops[i].kind == pokQuery: + conn.sendBuf.addDescribe(dkPortal, "") + conn.sendBuf.addExecute("", 0) + + conn.sendBuf.addSync() # Per-op SYNC for error isolation + + await conn.sendBufMsg() + + # Receive Phase (per-op ReadyForQuery) + var results = newSeq[PipelineResult](p.ops.len) + var errors = newSeq[ref CatchableError](p.ops.len) + + # Initialize query results + for i in 0 ..< p.ops.len: + if p.ops[i].kind == pokQuery: + results[i] = PipelineResult(kind: prkQuery) + if p.ops[i].cacheHit: + let c = cachedStmts[i] + results[i].queryResult.fields = c.fields + if p.ops[i].resultFormats.len > 0 and c.colFmts.len > 0: + for j in 0 ..< results[i].queryResult.fields.len: + results[i].queryResult.fields[j].formatCode = c.colFmts[j] + if results[i].queryResult.fields.len > 0: + results[i].queryResult.data = + newRowData(int16(results[i].queryResult.fields.len), c.colFmts, c.colOids) + else: + results[i] = PipelineResult(kind: prkExec) + + for opIdx in 0 ..< p.ops.len: + var opError: ref PgQueryError + var cachedFields: seq[FieldDescription] + + block opRecv: + while true: + var rowData: RowData = nil + var rowCount: ptr int32 = nil + if p.ops[opIdx].kind == pokQuery: + rowData = results[opIdx].queryResult.data + rowCount = addr results[opIdx].queryResult.rowCount + + while (let opt = conn.nextMessage(rowData, rowCount); opt.isSome): + let msg = opt.get + case msg.kind + of bmkParseComplete, bmkBindComplete, bmkCloseComplete: + discard + of bmkParameterDescription: + discard + of bmkRowDescription: + if p.ops[opIdx].kind == pokQuery: + if p.ops[opIdx].cacheMiss: + cachedFields = msg.fields + results[opIdx].queryResult.fields = msg.fields + var cf: seq[int16] + var co: seq[int32] + if p.ops[opIdx].resultFormats.len > 0: + cf = newSeq[int16](msg.fields.len) + co = newSeq[int32](msg.fields.len) + for j in 0 ..< msg.fields.len: + co[j] = msg.fields[j].typeOid + if p.ops[opIdx].resultFormats.len == 1: + results[opIdx].queryResult.fields[j].formatCode = + p.ops[opIdx].resultFormats[0] + cf[j] = p.ops[opIdx].resultFormats[0] + elif j < p.ops[opIdx].resultFormats.len: + results[opIdx].queryResult.fields[j].formatCode = + p.ops[opIdx].resultFormats[j] + cf[j] = p.ops[opIdx].resultFormats[j] + results[opIdx].queryResult.data = + newRowData(int16(msg.fields.len), cf, co) + rowData = results[opIdx].queryResult.data + rowCount = addr results[opIdx].queryResult.rowCount + else: + results[opIdx].queryResult.fields = msg.fields + results[opIdx].queryResult.data = newRowData(int16(msg.fields.len)) + rowData = results[opIdx].queryResult.data + rowCount = addr results[opIdx].queryResult.rowCount + of bmkNoData: + discard + of bmkCommandComplete: + if p.ops[opIdx].kind == pokExec: + results[opIdx].commandResult = initCommandResult(msg.commandTag) + else: + results[opIdx].queryResult.commandTag = msg.commandTag + of bmkEmptyQueryResponse: + discard + of bmkErrorResponse: + if opError == nil: + opError = newPgQueryError(msg.errorFields) + of bmkReadyForQuery: + conn.txStatus = msg.txStatus + if opError != nil: + if opError.sqlState == "26000" and p.ops[opIdx].cacheHit: + conn.removeStmtCache(p.ops[opIdx].sql) + errors[opIdx] = opError + elif p.ops[opIdx].cacheMiss: + conn.addStmtCache( + p.ops[opIdx].sql, + CachedStmt(name: p.ops[opIdx].stmtName, fields: cachedFields), + ) + break opRecv + else: + discard + await conn.fillRecvBuf(timeout) + + conn.state = csReady + return IsolatedPipelineResults(results: results, errors: errors) + +proc executeIsolated*( + p: Pipeline, timeout: Duration = ZeroDuration +): Future[IsolatedPipelineResults] {.async.} = + ## Execute all queued pipeline operations with per-query error isolation. + ## Each operation gets its own SYNC message, so a failed operation does not + ## abort subsequent ones. Returns results and per-op errors. + ## On timeout, the connection is marked csClosed (protocol out of sync). + if p.ops.len == 0: + return IsolatedPipelineResults(results: @[], errors: @[]) + var ir: IsolatedPipelineResults + withConnTracing( + p.conn, + onPipelineStart, + onPipelineEnd, + TracePipelineStartData(opCount: p.ops.len), + TracePipelineEndData, + TracePipelineEndData(), + ): + if timeout > ZeroDuration: + try: + ir = await executeIsolatedImpl(p, timeout).wait(timeout) + except AsyncTimeoutError: + p.conn.cancelNoWait() + p.conn.state = csClosed + raise newException(PgTimeoutError, "Pipeline executeIsolated timed out") + else: + ir = await executeIsolatedImpl(p, timeout) + return ir + proc openCursorImpl( conn: PgConnection, sql: string, diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index e921cec..1557d19 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -30,6 +30,14 @@ type ## "RESET ALL" (reset session parameters only). ## On failure, the connection is discarded. tracer*: PgTracer ## Optional tracer for pool-level hooks (acquire/release) + pipelined*: bool + ## Enable implicit query batching for pool.exec/query (default false). + ## When enabled, concurrent calls within the same event loop tick are + ## batched into a single TCP write per connection using per-query SYNC + ## for error isolation. + maxPipelineSize*: int + ## Max operations per pipeline batch per connection (default 0=unlimited). + ## Only used when `pipelined` is true. PooledConn = object ## An idle connection held by the pool with its last-used timestamp. @@ -47,6 +55,19 @@ type createCount*: int64 ## Number of new connections created closeCount*: int64 ## Number of connections closed/discarded + PendingOpKind = enum + popExec + popQuery + + PendingPoolOp = ref object + kind: PendingOpKind + sql: string + params: seq[PgParam] + resultFormat: ResultFormat ## Only used for popQuery + timeout: Duration + execFut: Future[CommandResult] ## Non-nil for popExec + queryFut: Future[QueryResult] ## Non-nil for popQuery + PgPool* = ref object ## Connection pool that manages a set of PostgreSQL connections. config: PoolConfig idle: Deque[PooledConn] @@ -58,6 +79,8 @@ type cachedNow: Moment ## Updated on acquire(); reused by release() to avoid extra syscalls metrics: PoolMetrics + pendingOps: Deque[PendingPoolOp] ## Queue for implicit pipeline batching + dispatchScheduled: bool ## Whether a dispatch callback is pending proc initPoolConfig*( connConfig: ConnConfig, @@ -71,10 +94,13 @@ proc initPoolConfig*( acquireTimeout = seconds(30), maxWaiters = -1, resetQuery = "", + pipelined = false, + maxPipelineSize = 0, ): PoolConfig = ## Create a pool configuration with sensible defaults. ## `minSize` idle connections are maintained; up to `maxSize` total. ## Set `resetQuery` to clean session state on release (e.g. "DISCARD ALL" for PgBouncer). + ## Set `pipelined` to true to enable implicit query batching for `pool.exec`/`pool.query`. ## ## Raises `ValueError` if parameters are invalid. if minSize < 0: @@ -100,6 +126,8 @@ proc initPoolConfig*( acquireTimeout: acquireTimeout, maxWaiters: maxWaiters, resetQuery: resetQuery, + pipelined: pipelined, + maxPipelineSize: maxPipelineSize, ) proc poolConfig*(pool: PgPool): PoolConfig = @@ -236,6 +264,8 @@ proc newPool*(config: PoolConfig): Future[PgPool] {.async.} = waiters: initDeque[Waiter](), waiterCount: 0, closed: false, + pendingOps: initDeque[PendingPoolOp](), + dispatchScheduled: false, ) try: @@ -412,6 +442,164 @@ template withConnection*(pool: PgPool, conn, body: untyped) = await pool.resetSession(conn) pool.release(conn) +proc failPendingOp(op: PendingPoolOp, e: ref CatchableError) = + ## Fail a pending op's future if not already finished. + case op.kind + of popExec: + if not op.execFut.finished: + op.execFut.fail(e) + of popQuery: + if not op.queryFut.finished: + op.queryFut.fail(e) + +proc executeBatch( + pool: PgPool, conn: PgConnection, batch: seq[PendingPoolOp] +): Future[void] {.async.} = + ## Execute a batch of pending operations on a single connection via pipeline. + let batchTimeout = block: + var t = ZeroDuration + for op in batch: + if op.timeout > t: + t = op.timeout + t + try: + let pipeline = newPipeline(conn) + for op in batch: + case op.kind + of popExec: + pipeline.addExec(op.sql, op.params) + of popQuery: + pipeline.addQuery(op.sql, op.params, op.resultFormat) + let ir = await pipeline.executeIsolated(batchTimeout) + for i in 0 ..< batch.len: + let op = batch[i] + if ir.errors[i] != nil: + case op.kind + of popExec: + op.execFut.fail(ir.errors[i]) + of popQuery: + op.queryFut.fail(ir.errors[i]) + else: + case op.kind + of popExec: + op.execFut.complete(ir.results[i].commandResult) + of popQuery: + op.queryFut.complete(ir.results[i].queryResult) + except CatchableError as e: + for op in batch: + failPendingOp(op, e) + finally: + await pool.resetSession(conn) + pool.release(conn) + +proc dispatchBatchImpl(pool: PgPool) {.async.} = + ## Drain the pending ops queue and execute them via pipelined connections. + pool.dispatchScheduled = false + if pool.pendingOps.len == 0 or pool.closed: + return + + # Drain queue (respect maxPipelineSize) + var ops: seq[PendingPoolOp] + let maxOps = pool.config.maxPipelineSize + while pool.pendingOps.len > 0: + if maxOps > 0 and ops.len >= maxOps: + break + ops.add(pool.pendingOps.popFirst()) + + # Fast path: single op, skip pipeline overhead + if ops.len == 1: + let op = ops[0] + try: + let conn = await pool.acquire() + try: + case op.kind + of popExec: + let r = await conn.exec(op.sql, op.params, timeout = op.timeout) + op.execFut.complete(r) + of popQuery: + let r = await conn.query( + op.sql, op.params, resultFormat = op.resultFormat, timeout = op.timeout + ) + op.queryFut.complete(r) + finally: + await pool.resetSession(conn) + pool.release(conn) + except CatchableError as e: + failPendingOp(op, e) + return + + # Multi-op path: acquire connections and distribute. + # Limit to at most half the pool to avoid starving other users. + var conns: seq[PgConnection] + let maxConns = min(ops.len, max(1, pool.config.maxSize div 2)) + for i in 0 ..< maxConns: + try: + let conn = await pool.acquire() + conns.add(conn) + except CatchableError: + break + + if conns.len == 0: + let err = newException(PgPoolError, "Failed to acquire connection for batch") + for op in ops: + failPendingOp(op, err) + return + + # Distribute ops round-robin across connections + var connOps = newSeq[seq[PendingPoolOp]](conns.len) + for i in 0 ..< ops.len: + connOps[i mod conns.len].add(ops[i]) + + # Execute each connection's batch in parallel + var batchFuts: seq[Future[void]] + for ci in 0 ..< conns.len: + if connOps[ci].len == 0: + await pool.resetSession(conns[ci]) + pool.release(conns[ci]) + continue + batchFuts.add(executeBatch(pool, conns[ci], connOps[ci])) + + await allFutures(batchFuts) + +proc scheduleDispatch(pool: PgPool) {.gcsafe, raises: [].} = + ## Schedule a batch dispatch on the next event loop tick. + if pool.dispatchScheduled: + return + pool.dispatchScheduled = true + let p = pool + proc cb() {.gcsafe, raises: [].} = + proc run(pool: PgPool) {.async.} = + try: + await pool.dispatchBatchImpl() + except CatchableError as e: + # Fail any ops still in the queue so their futures don't hang forever. + while pool.pendingOps.len > 0: + let op = pool.pendingOps.popFirst() + failPendingOp(op, e) + # Re-schedule if there are remaining ops + if pool.pendingOps.len > 0: + pool.scheduleDispatch() + + {.gcsafe.}: + try: + asyncSpawn p.run() + except Exception as e: + # asyncSpawn should not raise in practice, but the compiler cannot + # prove it. Fail any pending ops so their futures do not hang. + let err = newException(PgError, "Pipeline dispatch failed: " & e.msg) + try: + while p.pendingOps.len > 0: + let op = p.pendingOps.popFirst() + failPendingOp(op, err) + except Exception: + discard + p.dispatchScheduled = false + + try: + scheduleSoon(cb) + except CatchableError: + pool.dispatchScheduled = false + proc exec*( pool: PgPool, sql: string, @@ -419,6 +607,17 @@ proc exec*( timeout: Duration = ZeroDuration, ): Future[CommandResult] {.async.} = ## Execute a statement with typed parameters using a pooled connection. + ## When `pipelined` is enabled, the operation is batched with other concurrent + ## calls and sent in a single TCP write. + if pool.config.pipelined: + let fut = newFuture[CommandResult]("PgPool.exec.pipelined") + pool.pendingOps.addLast( + PendingPoolOp( + kind: popExec, sql: sql, params: params, timeout: timeout, execFut: fut + ) + ) + pool.scheduleDispatch() + return await fut let conn = await pool.acquire() try: return await conn.exec(sql, params, timeout = timeout) @@ -434,6 +633,22 @@ proc query*( timeout: Duration = ZeroDuration, ): Future[QueryResult] {.async.} = ## Execute a query with typed parameters using a pooled connection. + ## When `pipelined` is enabled, the operation is batched with other concurrent + ## calls and sent in a single TCP write. + if pool.config.pipelined: + let fut = newFuture[QueryResult]("PgPool.query.pipelined") + pool.pendingOps.addLast( + PendingPoolOp( + kind: popQuery, + sql: sql, + params: params, + resultFormat: resultFormat, + timeout: timeout, + queryFut: fut, + ) + ) + pool.scheduleDispatch() + return await fut let conn = await pool.acquire() try: return await conn.query(sql, params, resultFormat = resultFormat, timeout = timeout) @@ -465,12 +680,12 @@ proc queryOne*( timeout: Duration = ZeroDuration, ): Future[Option[Row]] {.async.} = ## Execute a query and return the first row, or `none` if no rows. - let conn = await pool.acquire() - try: - return await conn.queryOne(sql, params, resultFormat, timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, resultFormat, timeout) + if qr.rowCount > 0: + if qr.fields.len > 0 and qr.data.fields.len == 0: + qr.data.fields = qr.fields + return some(Row(data: qr.data, rowIdx: 0)) + return none(Row) proc queryValue*( pool: PgPool, @@ -480,12 +695,13 @@ proc queryValue*( ): Future[string] {.async.} = ## Execute a query and return the first column of the first row as a string. ## Raises `PgError` if no rows or the value is NULL. - let conn = await pool.acquire() - try: - return await conn.queryValue(sql, params, timeout = timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, timeout = timeout) + if qr.rowCount == 0: + raise newException(PgError, "Query returned no rows") + let row = Row(data: qr.data, rowIdx: 0) + if row.isNull(0): + raise newException(PgError, "Query returned NULL") + return row.getStr(0) proc queryValue*[T]( pool: PgPool, @@ -496,12 +712,13 @@ proc queryValue*[T]( ): Future[T] {.async.} = ## Execute a query and return the first column of the first row as `T`. ## Raises `PgError` if no rows or the value is NULL. - let conn = await pool.acquire() - try: - return await conn.queryValue(T, sql, params, timeout = timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, timeout = timeout) + if qr.rowCount == 0: + raise newException(PgError, "Query returned no rows") + let row = Row(data: qr.data, rowIdx: 0) + if row.isNull(0): + raise newException(PgError, "Query returned NULL") + return row.get(0, T) proc queryValueOpt*( pool: PgPool, @@ -511,12 +728,13 @@ proc queryValueOpt*( ): Future[Option[string]] {.async.} = ## Execute a query and return the first column of the first row as a string. ## Returns `none` if no rows or the value is NULL. - let conn = await pool.acquire() - try: - return await conn.queryValueOpt(sql, params, timeout = timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, timeout = timeout) + if qr.rowCount == 0: + return none(string) + let row = Row(data: qr.data, rowIdx: 0) + if row.isNull(0): + return none(string) + return some(row.getStr(0)) proc queryValueOpt*[T]( pool: PgPool, @@ -527,12 +745,13 @@ proc queryValueOpt*[T]( ): Future[Option[T]] {.async.} = ## Execute a query and return the first column of the first row as `T`. ## Returns `none` if no rows or the value is NULL. - let conn = await pool.acquire() - try: - return await conn.queryValueOpt(T, sql, params, timeout = timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, timeout = timeout) + if qr.rowCount == 0: + return none(T) + let row = Row(data: qr.data, rowIdx: 0) + if row.isNull(0): + return none(T) + return some(row.get(0, T)) proc queryValueOrDefault*( pool: PgPool, @@ -543,12 +762,13 @@ proc queryValueOrDefault*( ): Future[string] {.async.} = ## Execute a query and return the first column of the first row as a string. ## Returns `default` if no rows or the value is NULL. - let conn = await pool.acquire() - try: - return await conn.queryValueOrDefault(sql, params, default, timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, timeout = timeout) + if qr.rowCount == 0: + return default + let row = Row(data: qr.data, rowIdx: 0) + if row.isNull(0): + return default + return row.getStr(0) proc queryValueOrDefault*[T]( pool: PgPool, @@ -560,12 +780,13 @@ proc queryValueOrDefault*[T]( ): Future[T] {.async.} = ## Execute a query and return the first column of the first row as `T`. ## Returns `default` if no rows or the value is NULL. - let conn = await pool.acquire() - try: - return await conn.queryValueOrDefault(T, sql, params, default, timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, timeout = timeout) + if qr.rowCount == 0: + return default + let row = Row(data: qr.data, rowIdx: 0) + if row.isNull(0): + return default + return row.get(0, T) proc queryExists*( pool: PgPool, @@ -574,12 +795,8 @@ proc queryExists*( timeout: Duration = ZeroDuration, ): Future[bool] {.async.} = ## Execute a query and return whether any rows exist. - let conn = await pool.acquire() - try: - return await conn.queryExists(sql, params, timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + let qr = await pool.query(sql, params, timeout = timeout) + return qr.rowCount > 0 proc queryColumn*( pool: PgPool, @@ -588,12 +805,13 @@ proc queryColumn*( timeout: Duration = ZeroDuration, ): Future[seq[string]] {.async.} = ## Execute a query and return the first column of all rows as strings. - let conn = await pool.acquire() - try: - return await conn.queryColumn(sql, params, timeout) - finally: - await pool.resetSession(conn) - pool.release(conn) + ## Raises PgTypeError if any value is NULL. + let qr = await pool.query(sql, params, timeout = timeout) + for i in 0 ..< qr.rowCount: + let row = Row(data: qr.data, rowIdx: i) + if row.isNull(0): + raise newException(PgTypeError, "NULL value in column") + result.add(row.getStr(0)) proc simpleQuery*(pool: PgPool, sql: string): Future[seq[QueryResult]] {.async.} = ## Execute one or more SQL statements via simple query protocol using a pooled connection. @@ -762,6 +980,13 @@ proc close*(pool: PgPool, timeout = ZeroDuration): Future[void] {.async.} = waiter.fut.fail(newException(PgError, "Pool closed")) pool.waiterCount = 0 + # Fail all pending pipeline ops + pool.dispatchScheduled = false + let closeErr = newException(PgError, "Pool closed") + while pool.pendingOps.len > 0: + let op = pool.pendingOps.popFirst() + failPendingOp(op, closeErr) + # Wait for active connections to drain if timeout > ZeroDuration and pool.active > 0: let deadline = Moment.now() + timeout diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index 4b8ce3c..80f705b 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -5530,6 +5530,401 @@ suite "E2E: execInTransaction / queryInTransaction": waitFor t() + test "pipeline: executeIsolated basic": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_pipe_iso") + discard + await conn.exec("CREATE TABLE test_pipe_iso (id serial PRIMARY KEY, val text)") + + let p = newPipeline(conn) + p.addExec("INSERT INTO test_pipe_iso (val) VALUES ($1)", @[toPgParam("a")]) + p.addQuery("SELECT 42::int4") + p.addExec("INSERT INTO test_pipe_iso (val) VALUES ($1)", @[toPgParam("b")]) + let ir = await p.executeIsolated() + doAssert ir.results.len == 3 + doAssert ir.errors.len == 3 + for i in 0 ..< 3: + doAssert ir.errors[i] == nil + doAssert ir.results[0].kind == prkExec + doAssert ir.results[0].commandResult == "INSERT 0 1" + doAssert ir.results[1].kind == prkQuery + doAssert ir.results[1].queryResult.rows[0].getStr(0) == "42" + doAssert ir.results[2].kind == prkExec + + let qr = await conn.query("SELECT val FROM test_pipe_iso ORDER BY id") + doAssert qr.rowCount == 2 + doAssert qr.rows[0].getStr(0) == "a" + doAssert qr.rows[1].getStr(0) == "b" + + discard await conn.exec("DROP TABLE test_pipe_iso") + await conn.close() + + waitFor t() + + test "pipeline: executeIsolated error does not abort subsequent ops": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_pipe_iso2") + discard await conn.exec( + "CREATE TABLE test_pipe_iso2 (id serial PRIMARY KEY, val text NOT NULL)" + ) + + let p = newPipeline(conn) + p.addExec("INSERT INTO test_pipe_iso2 (val) VALUES ($1)", @[toPgParam("ok")]) + # This will fail: NULL violates NOT NULL constraint + p.addExec("INSERT INTO test_pipe_iso2 (val) VALUES (NULL)") + p.addExec("INSERT INTO test_pipe_iso2 (val) VALUES ($1)", @[toPgParam("also_ok")]) + let ir = await p.executeIsolated() + doAssert ir.results.len == 3 + doAssert ir.errors[0] == nil + doAssert ir.errors[1] != nil + doAssert ir.errors[2] == nil # NOT aborted, unlike execute() + + # Connection should still be usable + doAssert conn.state == csReady + let qr = await conn.query("SELECT val FROM test_pipe_iso2 ORDER BY id") + doAssert qr.rowCount == 2 + doAssert qr.rows[0].getStr(0) == "ok" + doAssert qr.rows[1].getStr(0) == "also_ok" + + discard await conn.exec("DROP TABLE test_pipe_iso2") + await conn.close() + + waitFor t() + + test "pipeline: executeIsolated empty": + proc t() {.async.} = + let conn = await connect(plainConfig()) + let p = newPipeline(conn) + let ir = await p.executeIsolated() + doAssert ir.results.len == 0 + doAssert ir.errors.len == 0 + await conn.close() + + waitFor t() + + test "pipeline: executeIsolated with cache hit": + proc t() {.async.} = + let conn = await connect(plainConfig()) + + # Warm the cache + discard await conn.query("SELECT $1::text", @[toPgParam("warm")]) + + let p = newPipeline(conn) + p.addQuery("SELECT $1::text", @[toPgParam("cached")]) + p.addQuery("SELECT $1::int4", @[toPgParam(99'i32)]) + let ir = await p.executeIsolated() + doAssert ir.errors[0] == nil + doAssert ir.errors[1] == nil + doAssert ir.results[0].queryResult.rows[0].getStr(0) == "cached" + doAssert ir.results[1].queryResult.rows[0].getStr(0) == "99" + + await conn.close() + + waitFor t() + + test "pipeline: executeIsolated timeout": + proc t() {.async.} = + let conn = await connect(plainConfig()) + let p = newPipeline(conn) + p.addQuery("SELECT pg_sleep(10)") + var caught = false + try: + discard await p.executeIsolated(timeout = milliseconds(100)) + except PgTimeoutError: + caught = true + doAssert caught + doAssert conn.state == csClosed + + waitFor t() + + test "pipeline: executeIsolated cache eviction": + proc t() {.async.} = + let conn = await connect(plainConfig()) + conn.stmtCacheCapacity = 2 + + # Warm cache with 2 entries + discard await conn.query("SELECT 1") + discard await conn.query("SELECT 2") + doAssert conn.stmtCache.len == 2 + + # Pipeline 2 new queries via executeIsolated => evicts both old entries + let p = newPipeline(conn) + p.addQuery("SELECT 100::int4") + p.addQuery("SELECT 200::int4") + let ir = await p.executeIsolated() + doAssert ir.errors[0] == nil + doAssert ir.errors[1] == nil + doAssert ir.results[0].queryResult.rows[0].getStr(0) == "100" + doAssert ir.results[1].queryResult.rows[0].getStr(0) == "200" + + doAssert conn.stmtCache.len == 2 + doAssert conn.stmtCache.hasKey("SELECT 100::int4") + doAssert conn.stmtCache.hasKey("SELECT 200::int4") + + await conn.close() + + waitFor t() + +suite "E2E: Pipelined Pool": + test "pipelined pool: basic exec and query": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + discard await pool.exec("DROP TABLE IF EXISTS test_pp") + discard await pool.exec("CREATE TABLE test_pp (id serial PRIMARY KEY, val text)") + + discard + await pool.exec("INSERT INTO test_pp (val) VALUES ($1)", @[toPgParam("hi")]) + let qr = await pool.query("SELECT val FROM test_pp") + doAssert qr.rowCount == 1 + doAssert qr.rows[0].getStr(0) == "hi" + + discard await pool.exec("DROP TABLE test_pp") + await pool.close() + + waitFor t() + + test "pipelined pool: concurrent ops are batched": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + + # Fire multiple queries concurrently within the same tick + let f1 = pool.query("SELECT 1::int4") + let f2 = pool.query("SELECT 2::int4") + let f3 = pool.query("SELECT 3::int4") + let r1 = await f1 + let r2 = await f2 + let r3 = await f3 + doAssert r1.rows[0].getStr(0) == "1" + doAssert r2.rows[0].getStr(0) == "2" + doAssert r3.rows[0].getStr(0) == "3" + + await pool.close() + + waitFor t() + + test "pipelined pool: error isolation between ops": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + + let f1 = pool.query("SELECT 1::int4") + let f2 = pool.query("INVALID SQL HERE") + let f3 = pool.query("SELECT 3::int4") + + let r1 = await f1 + doAssert r1.rows[0].getStr(0) == "1" + + var gotError = false + try: + discard await f2 + except PgError: + gotError = true + doAssert gotError + + let r3 = await f3 + doAssert r3.rows[0].getStr(0) == "3" + + await pool.close() + + waitFor t() + + test "pipelined pool: exec error isolation": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + discard await pool.exec("DROP TABLE IF EXISTS test_pp_exec_err") + discard await pool.exec("CREATE TABLE test_pp_exec_err (id int PRIMARY KEY)") + discard await pool.exec("INSERT INTO test_pp_exec_err VALUES (1)") + + # Fire concurrent ops: one exec will violate PK, others should succeed + let f1 = pool.query("SELECT 1::int4") + let f2 = pool.exec("INSERT INTO test_pp_exec_err VALUES (1)") # duplicate PK + let f3 = pool.query("SELECT 3::int4") + + let r1 = await f1 + doAssert r1.rows[0].getStr(0) == "1" + + var gotError = false + try: + discard await f2 + except PgError: + gotError = true + doAssert gotError + + let r3 = await f3 + doAssert r3.rows[0].getStr(0) == "3" + + discard await pool.exec("DROP TABLE test_pp_exec_err") + await pool.close() + + waitFor t() + + test "pipelined pool: queryOne works": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + let row = await pool.queryOne("SELECT 42::int4 AS answer") + doAssert row.isSome + doAssert row.get.getStr(0) == "42" + + let empty = await pool.queryOne("SELECT 1 WHERE false") + doAssert empty.isNone + + await pool.close() + + waitFor t() + + test "pipelined pool: queryValue works": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + let v = await pool.queryValue("SELECT 'hello'::text") + doAssert v == "hello" + + await pool.close() + + waitFor t() + + test "pipelined pool: queryValueOpt works": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + let opt = await pool.queryValueOpt("SELECT 'found'::text") + doAssert opt.isSome + doAssert opt.get == "found" + + let optNone = await pool.queryValueOpt("SELECT 1 WHERE false") + doAssert optNone.isNone + + await pool.close() + + waitFor t() + + test "pipelined pool: queryValueOrDefault works": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + let v = await pool.queryValueOrDefault("SELECT 'val'::text", default = "fb") + doAssert v == "val" + + let def = + await pool.queryValueOrDefault("SELECT 1 WHERE false", default = "fallback") + doAssert def == "fallback" + + await pool.close() + + waitFor t() + + test "pipelined pool: queryExists works": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + let exists = await pool.queryExists("SELECT 1") + doAssert exists + + let notExists = await pool.queryExists("SELECT 1 WHERE false") + doAssert not notExists + + await pool.close() + + waitFor t() + + test "pipelined pool: queryColumn works": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + let col = await pool.queryColumn( + "SELECT v::text FROM (VALUES ('a'), ('b'), ('c')) AS t(v)" + ) + doAssert col.len == 3 + doAssert col == @["a", "b", "c"] + + await pool.close() + + waitFor t() + + test "pipelined pool: close fails pending ops": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 1, maxSize: 3, pipelined: true) + ) + # Queue an op but close immediately + let fut = pool.query("SELECT 1::int4") + await pool.close() + + var gotError = false + try: + discard await fut + except PgError: + gotError = true + doAssert gotError + + waitFor t() + + test "pipelined pool: maxPipelineSize limits batch": + proc t() {.async.} = + let pool = await newPool( + PoolConfig( + connConfig: plainConfig(), + minSize: 1, + maxSize: 3, + pipelined: true, + maxPipelineSize: 2, + ) + ) + + # Fire 4 queries concurrently; maxPipelineSize=2 means batches of 2 + let f1 = pool.query("SELECT 10::int4") + let f2 = pool.query("SELECT 20::int4") + let f3 = pool.query("SELECT 30::int4") + let f4 = pool.query("SELECT 40::int4") + doAssert (await f1).rows[0].getStr(0) == "10" + doAssert (await f2).rows[0].getStr(0) == "20" + doAssert (await f3).rows[0].getStr(0) == "30" + doAssert (await f4).rows[0].getStr(0) == "40" + + await pool.close() + + waitFor t() + + test "pipelined pool: high concurrency distributes across connections": + proc t() {.async.} = + let pool = await newPool( + PoolConfig(connConfig: plainConfig(), minSize: 2, maxSize: 4, pipelined: true) + ) + + # Fire 6 concurrent queries -- more than maxSize + let f1 = pool.query("SELECT 1::int4") + let f2 = pool.query("SELECT 2::int4") + let f3 = pool.query("SELECT 3::int4") + let f4 = pool.query("SELECT 4::int4") + let f5 = pool.query("SELECT 5::int4") + let f6 = pool.query("SELECT 6::int4") + doAssert (await f1).rows[0].getStr(0) == "1" + doAssert (await f2).rows[0].getStr(0) == "2" + doAssert (await f3).rows[0].getStr(0) == "3" + doAssert (await f4).rows[0].getStr(0) == "4" + doAssert (await f5).rows[0].getStr(0) == "5" + doAssert (await f6).rows[0].getStr(0) == "6" + + await pool.close() + + waitFor t() + suite "E2E: queryDirect / execDirect": test "queryDirect with int32 param": proc t() {.async.} = diff --git a/tests/test_pool.nim b/tests/test_pool.nim index b185b4c..cdf4b0a 100644 --- a/tests/test_pool.nim +++ b/tests/test_pool.nim @@ -394,7 +394,6 @@ suite "Pool close": # Simulate a connection being released after a short delay proc releaseAfter(pool: PgPool) {.async.} = await sleepAsync(milliseconds(20)) - let conn = mockConn(csClosed) pool.active.dec let releaseFut = releaseAfter(pool)