diff --git a/async_postgres/pg_client.nim b/async_postgres/pg_client.nim index 22b3f87..90a1995 100644 --- a/async_postgres/pg_client.nim +++ b/async_postgres/pg_client.nim @@ -163,6 +163,10 @@ type inlineRanges: seq[tuple[off: int32, len: int32]] inlineOids: seq[int32] inlineFormats: seq[int16] + autoReset*: bool + ## When true, `execute`/`executeIsolated` call `reset()` in a `finally` + ## block so the Pipeline can be safely reused without leaking state from + ## the previous run. Default: false (backward-compatible). IsolatedPipelineResults* = object ## Results from `executeIsolated`: per-op error isolation via per-query SYNC. @@ -2481,9 +2485,24 @@ proc queryInTransaction*( ) return qr -proc newPipeline*(conn: PgConnection): Pipeline = +proc newPipeline*(conn: PgConnection, autoReset: bool = false): Pipeline = ## Create a new pipeline for batching multiple operations into a single round trip. - Pipeline(conn: conn, ops: @[]) + ## When `autoReset` is true, the pipeline's queued ops and inline buffers are + ## cleared automatically after each `execute`/`executeIsolated` call, making + ## it safe to reuse the same Pipeline instance. + Pipeline(conn: conn, ops: @[], autoReset: autoReset) + +proc reset*(p: Pipeline) = + ## Clear all queued ops and inline SoA buffers. Safe to call at any time, + ## including while the pipeline is empty. Does not affect the underlying + ## connection or its statement cache. When `p.autoReset` is true, + ## `execute`/`executeIsolated` call this automatically (including on raise), + ## so manual calls are only needed when `autoReset` is false. + p.ops.setLen(0) + p.inlineData.setLen(0) + p.inlineRanges.setLen(0) + p.inlineOids.setLen(0) + p.inlineFormats.setLen(0) proc appendInline( p: Pipeline, params: openArray[PgParamInline] @@ -2801,24 +2820,30 @@ proc execute*( ): Future[seq[PipelineResult]] {.async.} = ## Execute all queued pipeline operations in a single round trip. ## On timeout, the connection is marked csClosed (protocol out of sync). - if p.ops.len == 0: - return @[] + ## When `p.autoReset` is true, the pipeline is reset on exit (including on + ## raise) so it can be safely reused. var results: seq[PipelineResult] - withConnTracing( - p.conn, - onPipelineStart, - onPipelineEnd, - TracePipelineStartData(opCount: p.ops.len), - TracePipelineEndData, - TracePipelineEndData(), - ): - if timeout > ZeroDuration: - try: - results = await executeImpl(p, timeout).wait(timeout) - except AsyncTimeoutError: - p.conn.invalidateOnTimeout("Pipeline execute timed out") - else: - results = await executeImpl(p, timeout) + try: + if p.ops.len == 0: + return @[] + withConnTracing( + p.conn, + onPipelineStart, + onPipelineEnd, + TracePipelineStartData(opCount: p.ops.len), + TracePipelineEndData, + TracePipelineEndData(), + ): + if timeout > ZeroDuration: + try: + results = await executeImpl(p, timeout).wait(timeout) + except AsyncTimeoutError: + p.conn.invalidateOnTimeout("Pipeline execute timed out") + else: + results = await executeImpl(p, timeout) + finally: + if p.autoReset: + p.reset() return results proc executeIsolatedImpl( @@ -3055,24 +3080,30 @@ proc executeIsolated*( ## Each operation gets its own SYNC message, so a failed operation does not ## abort subsequent ones. Returns results and per-op errors. ## On timeout, the connection is marked csClosed (protocol out of sync). - if p.ops.len == 0: - return IsolatedPipelineResults(results: @[], errors: @[]) + ## When `p.autoReset` is true, the pipeline is reset on exit (including on + ## raise) so it can be safely reused. var ir: IsolatedPipelineResults - withConnTracing( - p.conn, - onPipelineStart, - onPipelineEnd, - TracePipelineStartData(opCount: p.ops.len), - TracePipelineEndData, - TracePipelineEndData(), - ): - if timeout > ZeroDuration: - try: - ir = await executeIsolatedImpl(p, timeout).wait(timeout) - except AsyncTimeoutError: - p.conn.invalidateOnTimeout("Pipeline executeIsolated timed out") - else: - ir = await executeIsolatedImpl(p, timeout) + try: + if p.ops.len == 0: + return IsolatedPipelineResults(results: @[], errors: @[]) + withConnTracing( + p.conn, + onPipelineStart, + onPipelineEnd, + TracePipelineStartData(opCount: p.ops.len), + TracePipelineEndData, + TracePipelineEndData(), + ): + if timeout > ZeroDuration: + try: + ir = await executeIsolatedImpl(p, timeout).wait(timeout) + except AsyncTimeoutError: + p.conn.invalidateOnTimeout("Pipeline executeIsolated timed out") + else: + ir = await executeIsolatedImpl(p, timeout) + finally: + if p.autoReset: + p.reset() return ir proc openCursorImpl( diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index 027d00d..8ee5c70 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -6273,6 +6273,128 @@ suite "E2E: execInTransaction / queryInTransaction": waitFor t() + test "pipeline: reset clears queued ops and allows reuse": + proc t() {.async.} = + let conn = await connect(plainConfig()) + + let p = newPipeline(conn) + p.addQuery("SELECT $1::int4", @[toPgParam(1'i32)]) + p.addQuery("SELECT $1::int4", @[toPgParam(2'i32)]) + let r1 = await p.execute() + doAssert r1.len == 2 + + # Without reset, queued ops would be executed again. + p.reset() + let r0 = await p.execute() + doAssert r0.len == 0 + + # Reused instance is healthy for a brand-new batch. + p.addQuery("SELECT $1::text", @[toPgParam("reused")]) + let r2 = await p.execute() + doAssert r2.len == 1 + doAssert r2[0].queryResult.rows[0].getStr(0) == "reused" + + await conn.close() + + waitFor t() + + test "pipeline: reset on empty pipeline is a no-op": + proc t() {.async.} = + let conn = await connect(plainConfig()) + + let p = newPipeline(conn) + p.reset() + p.reset() + let results = await p.execute() + doAssert results.len == 0 + + await conn.close() + + waitFor t() + + test "pipeline: autoReset clears state after execute": + proc t() {.async.} = + let conn = await connect(plainConfig()) + + let p = newPipeline(conn, autoReset = true) + p.addQuery("SELECT $1::int4", @[toPgParam(10'i32)]) + p.addQuery("SELECT $1::int4", @[toPgParam(20'i32)]) + let r1 = await p.execute() + doAssert r1.len == 2 + + # After the first execute(), autoReset has cleared queued ops, so + # calling execute() again produces an empty result without replaying. + let r0 = await p.execute() + doAssert r0.len == 0 + + # Same instance is safe to reuse for a new batch. + p.addQuery("SELECT $1::text", @[toPgParam("auto")]) + let r2 = await p.execute() + doAssert r2.len == 1 + doAssert r2[0].queryResult.rows[0].getStr(0) == "auto" + + await conn.close() + + waitFor t() + + test "pipeline: autoReset clears state after executeIsolated (incl. on error)": + proc t() {.async.} = + let conn = await connect(plainConfig()) + + let p = newPipeline(conn, autoReset = true) + p.addQuery("SELECT 1::int4") + p.addQuery("SELECT * FROM __definitely_missing_table__") + p.addQuery("SELECT 2::int4") + let ir = await p.executeIsolated() + doAssert ir.results.len == 3 + doAssert ir.errors.len == 3 + doAssert ir.errors[0] == nil + doAssert ir.errors[1] != nil + doAssert ir.errors[2] == nil + + # After executeIsolated, autoReset should have cleared queued ops. + let ir0 = await p.executeIsolated() + doAssert ir0.results.len == 0 + doAssert ir0.errors.len == 0 + + await conn.close() + + waitFor t() + + test "pipeline: autoReset clears state when execute raises": + # execute() uses a single SYNC, so a failing op aborts the batch and the + # await re-raises. Confirms the finally-path reset runs on raise. + proc t() {.async.} = + let conn = await connect(plainConfig()) + + let p = newPipeline(conn, autoReset = true) + p.addExec("SELECT 1") + p.addExec("INVALID SQL THAT WILL FAIL") + p.addExec("SELECT 2") + var gotError = false + try: + discard await p.execute() + except PgError: + gotError = true + doAssert gotError + + # Connection should still be usable after the error. + doAssert conn.state == csReady + + # autoReset must have cleared queued ops even though execute() raised. + let r0 = await p.execute() + doAssert r0.len == 0 + + # Reused instance is healthy for a brand-new batch. + p.addQuery("SELECT $1::int4", @[toPgParam(42'i32)]) + let r1 = await p.execute() + doAssert r1.len == 1 + doAssert r1[0].queryResult.rows[0].getStr(0) == "42" + + await conn.close() + + waitFor t() + test "pipeline: pool withPipeline": proc t() {.async.} = let pool =