From ab30fa5fa3d0ead2aae7b90520e1c0a9edbef2b4 Mon Sep 17 00:00:00 2001 From: fox0430 Date: Wed, 15 Apr 2026 18:33:27 +0900 Subject: [PATCH] Add withTransaction macro to PgPoolCluster --- async_postgres/pg_pool_cluster.nim | 74 +++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/async_postgres/pg_pool_cluster.nim b/async_postgres/pg_pool_cluster.nim index 8af5eca..d97834b 100644 --- a/async_postgres/pg_pool_cluster.nim +++ b/async_postgres/pg_pool_cluster.nim @@ -13,7 +13,7 @@ type ## - `read*` methods route to the replica pool (read-only queries). ## - `write*` methods route to the primary pool (writes, `SELECT FOR UPDATE`, etc.). ## - ## For transactions, use `cluster.primaryPool.withTransaction` directly. + ## For transactions, use `cluster.withTransaction`. primary: PgPool replica: PgPool fallback: ReplicaFallback @@ -449,6 +449,78 @@ clusterForwards("write"): timeout: Duration = ZeroDuration, ): Future[void] +macro withTransaction*(cluster: PgPoolCluster, args: varargs[untyped]): untyped = + ## Execute `body` inside a BEGIN/COMMIT transaction on the primary pool. + ## On exception, ROLLBACK is issued automatically. + ## Using `return` inside the body is a compile-time error. + ## + ## Usage: + ## cluster.withTransaction(conn): + ## conn.exec(...) + ## cluster.withTransaction(conn, seconds(5)): + ## conn.exec(...) + ## cluster.withTransaction(conn, TransactionOptions(isolation: ilSerializable)): + ## conn.exec(...) + ## cluster.withTransaction(conn, opts, seconds(5)): + ## conn.exec(...) + ## + ## **Warning:** Inside the body, use `conn.exec(...)` / `conn.query(...)` + ## directly — not `cluster.writeExec(...)` / `cluster.writeQuery(...)`. + ## Cluster methods acquire a separate connection, so those statements would + ## run outside this transaction. + var connIdent, body: NimNode + var beginSql: NimNode + var txTimeout: NimNode + case args.len + of 2: + connIdent = args[0] + body = args[1] + beginSql = newStrLitNode("BEGIN") + txTimeout = bindSym"ZeroDuration" + of 3: + connIdent = args[0] + body = args[2] + (beginSql, txTimeout) = buildTxBeginAndTimeout(args[1]) + of 4: + connIdent = args[0] + let opts = args[1] + txTimeout = args[2] + body = args[3] + beginSql = newCall(bindSym"buildBeginSql", opts) + else: + error( + "withTransaction expects (conn, body), (conn, timeout, body), (conn, opts, body), or (conn, opts, timeout, body)", + args[0], + ) + + if hasReturnStmt(body): + error( + "'return' inside withTransaction is not allowed: COMMIT/ROLLBACK would be skipped", + body, + ) + + let clusterExpr = cluster + let clusterSym = genSym(nskLet, "cluster") + let eSym = genSym(nskLet, "e") + let resetSessionSym = bindSym"resetSession" + result = quote: + let `clusterSym` = `clusterExpr` + let `connIdent` = await `clusterSym`.primary.acquire() + try: + discard await `connIdent`.simpleExec(`beginSql`, timeout = `txTimeout`) + try: + `body` + discard await `connIdent`.simpleExec("COMMIT", timeout = `txTimeout`) + except CatchableError as `eSym`: + try: + discard await `connIdent`.simpleExec("ROLLBACK", timeout = `txTimeout`) + except CatchableError: + discard + raise `eSym` + finally: + await `resetSessionSym`(`clusterSym`.primary, `connIdent`) + `clusterSym`.primary.release(`connIdent`) + template withPipeline*(cluster: PgPoolCluster, pipeline, body: untyped) = ## Create a pipeline on the primary pool. cluster.primary.withPipeline(pipeline):