Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 67 additions & 36 deletions async_postgres/pg_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ type
inlineRanges: seq[tuple[off: int32, len: int32]]
inlineOids: seq[int32]
inlineFormats: seq[int16]
autoReset*: bool
## When true, `execute`/`executeIsolated` call `reset()` in a `finally`
## block so the Pipeline can be safely reused without leaking state from
## the previous run. Default: false (backward-compatible).

IsolatedPipelineResults* = object
## Results from `executeIsolated`: per-op error isolation via per-query SYNC.
Expand Down Expand Up @@ -2481,9 +2485,24 @@ proc queryInTransaction*(
)
return qr

proc newPipeline*(conn: PgConnection): Pipeline =
proc newPipeline*(conn: PgConnection, autoReset: bool = false): Pipeline =
## Create a new pipeline for batching multiple operations into a single round trip.
Pipeline(conn: conn, ops: @[])
## When `autoReset` is true, the pipeline's queued ops and inline buffers are
## cleared automatically after each `execute`/`executeIsolated` call, making
## it safe to reuse the same Pipeline instance.
Pipeline(conn: conn, ops: @[], autoReset: autoReset)

proc reset*(p: Pipeline) =
## Clear all queued ops and inline SoA buffers. Safe to call at any time,
## including while the pipeline is empty. Does not affect the underlying
## connection or its statement cache. When `p.autoReset` is true,
## `execute`/`executeIsolated` call this automatically (including on raise),
## so manual calls are only needed when `autoReset` is false.
p.ops.setLen(0)
p.inlineData.setLen(0)
p.inlineRanges.setLen(0)
p.inlineOids.setLen(0)
p.inlineFormats.setLen(0)

proc appendInline(
p: Pipeline, params: openArray[PgParamInline]
Expand Down Expand Up @@ -2801,24 +2820,30 @@ proc execute*(
): Future[seq[PipelineResult]] {.async.} =
## Execute all queued pipeline operations in a single round trip.
## On timeout, the connection is marked csClosed (protocol out of sync).
if p.ops.len == 0:
return @[]
## When `p.autoReset` is true, the pipeline is reset on exit (including on
## raise) so it can be safely reused.
var results: seq[PipelineResult]
withConnTracing(
p.conn,
onPipelineStart,
onPipelineEnd,
TracePipelineStartData(opCount: p.ops.len),
TracePipelineEndData,
TracePipelineEndData(),
):
if timeout > ZeroDuration:
try:
results = await executeImpl(p, timeout).wait(timeout)
except AsyncTimeoutError:
p.conn.invalidateOnTimeout("Pipeline execute timed out")
else:
results = await executeImpl(p, timeout)
try:
if p.ops.len == 0:
return @[]
withConnTracing(
p.conn,
onPipelineStart,
onPipelineEnd,
TracePipelineStartData(opCount: p.ops.len),
TracePipelineEndData,
TracePipelineEndData(),
):
if timeout > ZeroDuration:
try:
results = await executeImpl(p, timeout).wait(timeout)
except AsyncTimeoutError:
p.conn.invalidateOnTimeout("Pipeline execute timed out")
else:
results = await executeImpl(p, timeout)
finally:
if p.autoReset:
p.reset()
return results

proc executeIsolatedImpl(
Expand Down Expand Up @@ -3055,24 +3080,30 @@ proc executeIsolated*(
## Each operation gets its own SYNC message, so a failed operation does not
## abort subsequent ones. Returns results and per-op errors.
## On timeout, the connection is marked csClosed (protocol out of sync).
if p.ops.len == 0:
return IsolatedPipelineResults(results: @[], errors: @[])
## When `p.autoReset` is true, the pipeline is reset on exit (including on
## raise) so it can be safely reused.
var ir: IsolatedPipelineResults
withConnTracing(
p.conn,
onPipelineStart,
onPipelineEnd,
TracePipelineStartData(opCount: p.ops.len),
TracePipelineEndData,
TracePipelineEndData(),
):
if timeout > ZeroDuration:
try:
ir = await executeIsolatedImpl(p, timeout).wait(timeout)
except AsyncTimeoutError:
p.conn.invalidateOnTimeout("Pipeline executeIsolated timed out")
else:
ir = await executeIsolatedImpl(p, timeout)
try:
if p.ops.len == 0:
return IsolatedPipelineResults(results: @[], errors: @[])
withConnTracing(
p.conn,
onPipelineStart,
onPipelineEnd,
TracePipelineStartData(opCount: p.ops.len),
TracePipelineEndData,
TracePipelineEndData(),
):
if timeout > ZeroDuration:
try:
ir = await executeIsolatedImpl(p, timeout).wait(timeout)
except AsyncTimeoutError:
p.conn.invalidateOnTimeout("Pipeline executeIsolated timed out")
else:
ir = await executeIsolatedImpl(p, timeout)
finally:
if p.autoReset:
p.reset()
return ir

proc openCursorImpl(
Expand Down
122 changes: 122 additions & 0 deletions tests/test_e2e.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6273,6 +6273,128 @@ suite "E2E: execInTransaction / queryInTransaction":

waitFor t()

test "pipeline: reset clears queued ops and allows reuse":
proc t() {.async.} =
let conn = await connect(plainConfig())

let p = newPipeline(conn)
p.addQuery("SELECT $1::int4", @[toPgParam(1'i32)])
p.addQuery("SELECT $1::int4", @[toPgParam(2'i32)])
let r1 = await p.execute()
doAssert r1.len == 2

# Without reset, queued ops would be executed again.
p.reset()
let r0 = await p.execute()
doAssert r0.len == 0

# Reused instance is healthy for a brand-new batch.
p.addQuery("SELECT $1::text", @[toPgParam("reused")])
let r2 = await p.execute()
doAssert r2.len == 1
doAssert r2[0].queryResult.rows[0].getStr(0) == "reused"

await conn.close()

waitFor t()

test "pipeline: reset on empty pipeline is a no-op":
proc t() {.async.} =
let conn = await connect(plainConfig())

let p = newPipeline(conn)
p.reset()
p.reset()
let results = await p.execute()
doAssert results.len == 0

await conn.close()

waitFor t()

test "pipeline: autoReset clears state after execute":
proc t() {.async.} =
let conn = await connect(plainConfig())

let p = newPipeline(conn, autoReset = true)
p.addQuery("SELECT $1::int4", @[toPgParam(10'i32)])
p.addQuery("SELECT $1::int4", @[toPgParam(20'i32)])
let r1 = await p.execute()
doAssert r1.len == 2

# After the first execute(), autoReset has cleared queued ops, so
# calling execute() again produces an empty result without replaying.
let r0 = await p.execute()
doAssert r0.len == 0

# Same instance is safe to reuse for a new batch.
p.addQuery("SELECT $1::text", @[toPgParam("auto")])
let r2 = await p.execute()
doAssert r2.len == 1
doAssert r2[0].queryResult.rows[0].getStr(0) == "auto"

await conn.close()

waitFor t()

test "pipeline: autoReset clears state after executeIsolated (incl. on error)":
proc t() {.async.} =
let conn = await connect(plainConfig())

let p = newPipeline(conn, autoReset = true)
p.addQuery("SELECT 1::int4")
p.addQuery("SELECT * FROM __definitely_missing_table__")
p.addQuery("SELECT 2::int4")
let ir = await p.executeIsolated()
doAssert ir.results.len == 3
doAssert ir.errors.len == 3
doAssert ir.errors[0] == nil
doAssert ir.errors[1] != nil
doAssert ir.errors[2] == nil

# After executeIsolated, autoReset should have cleared queued ops.
let ir0 = await p.executeIsolated()
doAssert ir0.results.len == 0
doAssert ir0.errors.len == 0

await conn.close()

waitFor t()

test "pipeline: autoReset clears state when execute raises":
# execute() uses a single SYNC, so a failing op aborts the batch and the
# await re-raises. Confirms the finally-path reset runs on raise.
proc t() {.async.} =
let conn = await connect(plainConfig())

let p = newPipeline(conn, autoReset = true)
p.addExec("SELECT 1")
p.addExec("INVALID SQL THAT WILL FAIL")
p.addExec("SELECT 2")
var gotError = false
try:
discard await p.execute()
except PgError:
gotError = true
doAssert gotError

# Connection should still be usable after the error.
doAssert conn.state == csReady

# autoReset must have cleared queued ops even though execute() raised.
let r0 = await p.execute()
doAssert r0.len == 0

# Reused instance is healthy for a brand-new batch.
p.addQuery("SELECT $1::int4", @[toPgParam(42'i32)])
let r1 = await p.execute()
doAssert r1.len == 1
doAssert r1[0].queryResult.rows[0].getStr(0) == "42"

await conn.close()

waitFor t()

test "pipeline: pool withPipeline":
proc t() {.async.} =
let pool =
Expand Down