Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion async_postgres/pg_pool_cluster.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type
## - `read*` methods route to the replica pool (read-only queries).
## - `write*` methods route to the primary pool (writes, `SELECT FOR UPDATE`, etc.).
##
## For transactions, use `cluster.primaryPool.withTransaction` directly.
## For transactions, use `cluster.withTransaction`.
primary: PgPool
replica: PgPool
fallback: ReplicaFallback
Expand Down Expand Up @@ -449,6 +449,78 @@ clusterForwards("write"):
timeout: Duration = ZeroDuration,
): Future[void]

macro withTransaction*(cluster: PgPoolCluster, args: varargs[untyped]): untyped =
## Execute `body` inside a BEGIN/COMMIT transaction on the primary pool.
## On exception, ROLLBACK is issued automatically.
## Using `return` inside the body is a compile-time error.
##
## Usage:
## cluster.withTransaction(conn):
## conn.exec(...)
## cluster.withTransaction(conn, seconds(5)):
## conn.exec(...)
## cluster.withTransaction(conn, TransactionOptions(isolation: ilSerializable)):
## conn.exec(...)
## cluster.withTransaction(conn, opts, seconds(5)):
## conn.exec(...)
##
## **Warning:** Inside the body, use `conn.exec(...)` / `conn.query(...)`
## directly — not `cluster.writeExec(...)` / `cluster.writeQuery(...)`.
## Cluster methods acquire a separate connection, so those statements would
## run outside this transaction.
var connIdent, body: NimNode
var beginSql: NimNode
var txTimeout: NimNode
case args.len
of 2:
connIdent = args[0]
body = args[1]
beginSql = newStrLitNode("BEGIN")
txTimeout = bindSym"ZeroDuration"
of 3:
connIdent = args[0]
body = args[2]
(beginSql, txTimeout) = buildTxBeginAndTimeout(args[1])
of 4:
connIdent = args[0]
let opts = args[1]
txTimeout = args[2]
body = args[3]
beginSql = newCall(bindSym"buildBeginSql", opts)
else:
error(
"withTransaction expects (conn, body), (conn, timeout, body), (conn, opts, body), or (conn, opts, timeout, body)",
args[0],
)

if hasReturnStmt(body):
error(
"'return' inside withTransaction is not allowed: COMMIT/ROLLBACK would be skipped",
body,
)

let clusterExpr = cluster
let clusterSym = genSym(nskLet, "cluster")
let eSym = genSym(nskLet, "e")
let resetSessionSym = bindSym"resetSession"
result = quote:
let `clusterSym` = `clusterExpr`
let `connIdent` = await `clusterSym`.primary.acquire()
try:
discard await `connIdent`.simpleExec(`beginSql`, timeout = `txTimeout`)
try:
`body`
discard await `connIdent`.simpleExec("COMMIT", timeout = `txTimeout`)
except CatchableError as `eSym`:
try:
discard await `connIdent`.simpleExec("ROLLBACK", timeout = `txTimeout`)
except CatchableError:
discard
raise `eSym`
finally:
await `resetSessionSym`(`clusterSym`.primary, `connIdent`)
`clusterSym`.primary.release(`connIdent`)

template withPipeline*(cluster: PgPoolCluster, pipeline, body: untyped) =
## Create a pipeline on the primary pool.
cluster.primary.withPipeline(pipeline):
Expand Down
Loading