From ce281cba27ed990bf805d15c3a119965ebfbff45 Mon Sep 17 00:00:00 2001 From: fox0430 Date: Mon, 30 Mar 2026 18:45:03 +0900 Subject: [PATCH] Add Advisory Lock --- async_postgres.nim | 5 +- async_postgres/pg_advisory_lock.nim | 508 ++++++++++++++++++++++++++++ tests/all_tests.nim | 5 +- tests/test_advisory_lock.nim | 375 ++++++++++++++++++++ 4 files changed, 889 insertions(+), 4 deletions(-) create mode 100644 async_postgres/pg_advisory_lock.nim create mode 100644 tests/test_advisory_lock.nim diff --git a/async_postgres.nim b/async_postgres.nim index cda89a1..9d3b408 100644 --- a/async_postgres.nim +++ b/async_postgres.nim @@ -47,14 +47,15 @@ ## - `pg_protocol `_ — Wire protocol encoding/decoding ## - `pg_auth `_ — MD5 and SCRAM-SHA-256 authentication ## - `pg_largeobject `_ — Large Object API for streaming binary data +## - `pg_advisory_lock `_ — Advisory lock API (session/transaction, exclusive/shared) ## - `async_backend `_ — Async framework abstraction (asyncdispatch / chronos) import async_postgres/[ async_backend, pg_protocol, pg_auth, pg_types, pg_connection, pg_client, pg_pool, - pg_pool_cluster, pg_largeobject, pg_sql, + pg_pool_cluster, pg_largeobject, pg_advisory_lock, pg_sql, ] export async_backend, pg_protocol, pg_auth, pg_types, pg_connection, pg_client, pg_pool, - pg_pool_cluster, pg_largeobject, pg_sql + pg_pool_cluster, pg_largeobject, pg_advisory_lock, pg_sql diff --git a/async_postgres/pg_advisory_lock.nim b/async_postgres/pg_advisory_lock.nim new file mode 100644 index 0000000..7884695 --- /dev/null +++ b/async_postgres/pg_advisory_lock.nim @@ -0,0 +1,508 @@ +## PostgreSQL Advisory Lock API +## +## Provides an async interface to PostgreSQL's advisory locking facility. +## Advisory locks are application-enforced locks that do not lock any actual +## table rows — they simply act on application-defined lock identifiers. +## +## Two flavours exist: +## +## - **Session-level** locks (default) — held until explicitly released or +## the session ends. +## - **Transaction-level** locks — released automatically at the end of the +## current transaction; no explicit unlock is needed. +## +## Locks can be **exclusive** (default) or **shared** (multiple sessions may +## hold the same shared lock concurrently). +## +## **Stacking:** Session-level advisory locks are stackable — if the same +## session acquires the same lock multiple times, it must be released the +## same number of times before it is truly released. The ``withAdvisoryLock`` +## templates handle acquire/release as a pair, but be careful not to nest +## them with the same key unintentionally. Transaction-level locks are not +## stackable and are always released at transaction end. +## +## Example +## ======= +## +## .. code-block:: nim +## # Session-level exclusive lock (blocking) +## await conn.advisoryLock(42'i64) +## defer: await conn.advisoryUnlock(42'i64) +## +## # Non-blocking try +## if await conn.advisoryTryLock(42'i64): +## defer: await conn.advisoryUnlock(42'i64) +## echo "acquired" +## +## # Transaction-level lock (auto-released on COMMIT/ROLLBACK) +## conn.withTransaction: +## await conn.advisoryLockXact(42'i64) +## echo "locked for this transaction" +## +## # Two-key variants +## await conn.advisoryLock(1'i32, 2'i32) +## await conn.advisoryUnlock(1'i32, 2'i32) +## +## # RAII-style convenience +## conn.withAdvisoryLock(42'i64): +## echo "lock held here" + +import std/macros + +import async_backend, pg_types, pg_connection, pg_client + +# Session-level exclusive locks + +proc advisoryLock*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a session-level exclusive advisory lock, blocking until available. + discard await conn.queryValue( + "SELECT pg_advisory_lock($1)", @[toPgParam(key)], timeout = timeout + ) + +proc advisoryTryLock*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a session-level exclusive advisory lock without blocking. + ## Returns ``true`` if the lock was acquired. + return await conn.queryValue( + bool, "SELECT pg_try_advisory_lock($1)", @[toPgParam(key)], timeout = timeout + ) + +proc advisoryUnlock*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Release a session-level exclusive advisory lock. + ## Returns ``true`` if the lock was held and successfully released. + return await conn.queryValue( + bool, "SELECT pg_advisory_unlock($1)", @[toPgParam(key)], timeout = timeout + ) + +# Session-level shared locks + +proc advisoryLockShared*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a session-level shared advisory lock, blocking until available. + discard await conn.queryValue( + "SELECT pg_advisory_lock_shared($1)", @[toPgParam(key)], timeout = timeout + ) + +proc advisoryTryLockShared*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a session-level shared advisory lock without blocking. + ## Returns ``true`` if the lock was acquired. + return await conn.queryValue( + bool, "SELECT pg_try_advisory_lock_shared($1)", @[toPgParam(key)], timeout = timeout + ) + +proc advisoryUnlockShared*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Release a session-level shared advisory lock. + ## Returns ``true`` if the lock was held and successfully released. + return await conn.queryValue( + bool, "SELECT pg_advisory_unlock_shared($1)", @[toPgParam(key)], timeout = timeout + ) + +proc advisoryUnlockAll*( + conn: PgConnection, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Release all session-level advisory locks held by the current session. + discard await conn.exec("SELECT pg_advisory_unlock_all()", timeout = timeout) + +# Transaction-level exclusive locks + +proc advisoryLockXact*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a transaction-level exclusive advisory lock, blocking until available. + ## Automatically released at end of the current transaction. + discard await conn.queryValue( + "SELECT pg_advisory_xact_lock($1)", @[toPgParam(key)], timeout = timeout + ) + +proc advisoryTryLockXact*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a transaction-level exclusive advisory lock without blocking. + ## Returns ``true`` if the lock was acquired. + return await conn.queryValue( + bool, "SELECT pg_try_advisory_xact_lock($1)", @[toPgParam(key)], timeout = timeout + ) + +# Transaction-level shared locks + +proc advisoryLockXactShared*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a transaction-level shared advisory lock, blocking until available. + ## Automatically released at end of the current transaction. + discard await conn.queryValue( + "SELECT pg_advisory_xact_lock_shared($1)", @[toPgParam(key)], timeout = timeout + ) + +proc advisoryTryLockXactShared*( + conn: PgConnection, key: int64, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a transaction-level shared advisory lock without blocking. + ## Returns ``true`` if the lock was acquired. + return await conn.queryValue( + bool, + "SELECT pg_try_advisory_xact_lock_shared($1)", + @[toPgParam(key)], + timeout = timeout, + ) + +# Two-key (int32, int32) variants — Session-level exclusive + +proc advisoryLock*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a session-level exclusive advisory lock using two int32 keys. + discard await conn.queryValue( + "SELECT pg_advisory_lock($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +proc advisoryTryLock*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a session-level exclusive advisory lock (two int32 keys). + return await conn.queryValue( + bool, + "SELECT pg_try_advisory_lock($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +proc advisoryUnlock*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Release a session-level exclusive advisory lock (two int32 keys). + return await conn.queryValue( + bool, + "SELECT pg_advisory_unlock($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +# Two-key (int32, int32) variants — Session-level shared + +proc advisoryLockShared*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a session-level shared advisory lock using two int32 keys. + discard await conn.queryValue( + "SELECT pg_advisory_lock_shared($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +proc advisoryTryLockShared*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a session-level shared advisory lock (two int32 keys). + return await conn.queryValue( + bool, + "SELECT pg_try_advisory_lock_shared($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +proc advisoryUnlockShared*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Release a session-level shared advisory lock (two int32 keys). + return await conn.queryValue( + bool, + "SELECT pg_advisory_unlock_shared($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +# Two-key (int32, int32) variants — Transaction-level exclusive + +proc advisoryLockXact*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a transaction-level exclusive advisory lock (two int32 keys). + discard await conn.queryValue( + "SELECT pg_advisory_xact_lock($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +proc advisoryTryLockXact*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a transaction-level exclusive advisory lock (two int32 keys). + return await conn.queryValue( + bool, + "SELECT pg_try_advisory_xact_lock($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +# Two-key (int32, int32) variants — Transaction-level shared + +proc advisoryLockXactShared*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[void] {.async.} = + ## Acquire a transaction-level shared advisory lock (two int32 keys). + discard await conn.queryValue( + "SELECT pg_advisory_xact_lock_shared($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +proc advisoryTryLockXactShared*( + conn: PgConnection, key1, key2: int32, timeout: Duration = ZeroDuration +): Future[bool] {.async.} = + ## Try to acquire a transaction-level shared advisory lock (two int32 keys). + return await conn.queryValue( + bool, + "SELECT pg_try_advisory_xact_lock_shared($1, $2)", + @[toPgParam(key1), toPgParam(key2)], + timeout = timeout, + ) + +# Convenience macros — session-level +# +# These are macros (not templates) so that ``conn``, ``key`` etc. are +# evaluated exactly once via ``genSym``-bound ``let`` bindings. + +macro withAdvisoryLock*(conn: PgConnection, key: int64, body: untyped): untyped = + ## Acquire a session-level exclusive advisory lock, execute ``body``, + ## then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k = genSym(nskLet, "key") + let connExpr = conn + let keyExpr = key + result = quote: + let `c` = `connExpr` + let `k` = `keyExpr` + await `c`.advisoryLock(`k`) + try: + `body` + finally: + discard await `c`.advisoryUnlock(`k`) + +macro withAdvisoryLock*( + conn: PgConnection, key: int64, timeout: Duration, body: untyped +): untyped = + ## Acquire a session-level exclusive advisory lock with a timeout, + ## execute ``body``, then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k = genSym(nskLet, "key") + let t = genSym(nskLet, "timeout") + let connExpr = conn + let keyExpr = key + let timeoutExpr = timeout + result = quote: + let `c` = `connExpr` + let `k` = `keyExpr` + let `t` = `timeoutExpr` + await `c`.advisoryLock(`k`, timeout = `t`) + try: + `body` + finally: + discard await `c`.advisoryUnlock(`k`) + +macro withAdvisoryLock*(conn: PgConnection, key1, key2: int32, body: untyped): untyped = + ## Acquire a session-level exclusive advisory lock (two int32 keys), + ## execute ``body``, then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k1 = genSym(nskLet, "key1") + let k2 = genSym(nskLet, "key2") + let connExpr = conn + let key1Expr = key1 + let key2Expr = key2 + result = quote: + let `c` = `connExpr` + let `k1` = `key1Expr` + let `k2` = `key2Expr` + await `c`.advisoryLock(`k1`, `k2`) + try: + `body` + finally: + discard await `c`.advisoryUnlock(`k1`, `k2`) + +macro withAdvisoryLock*( + conn: PgConnection, key1, key2: int32, timeout: Duration, body: untyped +): untyped = + ## Acquire a session-level exclusive advisory lock (two int32 keys) + ## with a timeout, execute ``body``, then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k1 = genSym(nskLet, "key1") + let k2 = genSym(nskLet, "key2") + let t = genSym(nskLet, "timeout") + let connExpr = conn + let key1Expr = key1 + let key2Expr = key2 + let timeoutExpr = timeout + result = quote: + let `c` = `connExpr` + let `k1` = `key1Expr` + let `k2` = `key2Expr` + let `t` = `timeoutExpr` + await `c`.advisoryLock(`k1`, `k2`, timeout = `t`) + try: + `body` + finally: + discard await `c`.advisoryUnlock(`k1`, `k2`) + +macro withAdvisoryLockShared*(conn: PgConnection, key: int64, body: untyped): untyped = + ## Acquire a session-level shared advisory lock, execute ``body``, + ## then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k = genSym(nskLet, "key") + let connExpr = conn + let keyExpr = key + result = quote: + let `c` = `connExpr` + let `k` = `keyExpr` + await `c`.advisoryLockShared(`k`) + try: + `body` + finally: + discard await `c`.advisoryUnlockShared(`k`) + +macro withAdvisoryLockShared*( + conn: PgConnection, key: int64, timeout: Duration, body: untyped +): untyped = + ## Acquire a session-level shared advisory lock with a timeout, + ## execute ``body``, then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k = genSym(nskLet, "key") + let t = genSym(nskLet, "timeout") + let connExpr = conn + let keyExpr = key + let timeoutExpr = timeout + result = quote: + let `c` = `connExpr` + let `k` = `keyExpr` + let `t` = `timeoutExpr` + await `c`.advisoryLockShared(`k`, timeout = `t`) + try: + `body` + finally: + discard await `c`.advisoryUnlockShared(`k`) + +macro withAdvisoryLockShared*( + conn: PgConnection, key1, key2: int32, body: untyped +): untyped = + ## Acquire a session-level shared advisory lock (two int32 keys), + ## execute ``body``, then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k1 = genSym(nskLet, "key1") + let k2 = genSym(nskLet, "key2") + let connExpr = conn + let key1Expr = key1 + let key2Expr = key2 + result = quote: + let `c` = `connExpr` + let `k1` = `key1Expr` + let `k2` = `key2Expr` + await `c`.advisoryLockShared(`k1`, `k2`) + try: + `body` + finally: + discard await `c`.advisoryUnlockShared(`k1`, `k2`) + +macro withAdvisoryLockShared*( + conn: PgConnection, key1, key2: int32, timeout: Duration, body: untyped +): untyped = + ## Acquire a session-level shared advisory lock (two int32 keys) + ## with a timeout, execute ``body``, then release the lock (even on exception). + let c = genSym(nskLet, "conn") + let k1 = genSym(nskLet, "key1") + let k2 = genSym(nskLet, "key2") + let t = genSym(nskLet, "timeout") + let connExpr = conn + let key1Expr = key1 + let key2Expr = key2 + let timeoutExpr = timeout + result = quote: + let `c` = `connExpr` + let `k1` = `key1Expr` + let `k2` = `key2Expr` + let `t` = `timeoutExpr` + await `c`.advisoryLockShared(`k1`, `k2`, timeout = `t`) + try: + `body` + finally: + discard await `c`.advisoryUnlockShared(`k1`, `k2`) + +# Transaction-level convenience templates + +template withAdvisoryLockXact*(conn: PgConnection, key: int64, body: untyped) = + ## Acquire a transaction-level exclusive advisory lock inside a transaction, + ## execute ``body``. The lock is automatically released at transaction end. + ## Must be called within ``withTransaction``. + await conn.advisoryLockXact(key) + body + +template withAdvisoryLockXact*( + conn: PgConnection, key: int64, timeout: Duration, body: untyped +) = + ## Acquire a transaction-level exclusive advisory lock with a timeout + ## inside a transaction, execute ``body``. The lock is automatically + ## released at transaction end. Must be called within ``withTransaction``. + await conn.advisoryLockXact(key, timeout = timeout) + body + +template withAdvisoryLockXact*(conn: PgConnection, key1, key2: int32, body: untyped) = + ## Acquire a transaction-level exclusive advisory lock (two int32 keys) + ## inside a transaction, execute ``body``. The lock is automatically + ## released at transaction end. Must be called within ``withTransaction``. + await conn.advisoryLockXact(key1, key2) + body + +template withAdvisoryLockXact*( + conn: PgConnection, key1, key2: int32, timeout: Duration, body: untyped +) = + ## Acquire a transaction-level exclusive advisory lock (two int32 keys) + ## with a timeout inside a transaction, execute ``body``. The lock is + ## automatically released at transaction end. + ## Must be called within ``withTransaction``. + await conn.advisoryLockXact(key1, key2, timeout = timeout) + body + +template withAdvisoryLockXactShared*(conn: PgConnection, key: int64, body: untyped) = + ## Acquire a transaction-level shared advisory lock inside a transaction, + ## execute ``body``. The lock is automatically released at transaction end. + ## Must be called within ``withTransaction``. + await conn.advisoryLockXactShared(key) + body + +template withAdvisoryLockXactShared*( + conn: PgConnection, key: int64, timeout: Duration, body: untyped +) = + ## Acquire a transaction-level shared advisory lock with a timeout + ## inside a transaction, execute ``body``. The lock is automatically + ## released at transaction end. Must be called within ``withTransaction``. + await conn.advisoryLockXactShared(key, timeout = timeout) + body + +template withAdvisoryLockXactShared*( + conn: PgConnection, key1, key2: int32, body: untyped +) = + ## Acquire a transaction-level shared advisory lock (two int32 keys) + ## inside a transaction, execute ``body``. The lock is automatically + ## released at transaction end. Must be called within ``withTransaction``. + await conn.advisoryLockXactShared(key1, key2) + body + +template withAdvisoryLockXactShared*( + conn: PgConnection, key1, key2: int32, timeout: Duration, body: untyped +) = + ## Acquire a transaction-level shared advisory lock (two int32 keys) + ## with a timeout inside a transaction, execute ``body``. The lock is + ## automatically released at transaction end. + ## Must be called within ``withTransaction``. + await conn.advisoryLockXactShared(key1, key2, timeout = timeout) + body diff --git a/tests/all_tests.nim b/tests/all_tests.nim index 48d5bda..64ee273 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -1,3 +1,4 @@ import - test_auth, test_dsn, test_e2e, test_keepalive, test_largeobject, test_pool, - test_protocol, test_rowdata, test_sql, test_ssl, test_types, test_pool_cluster + test_advisory_lock, test_auth, test_dsn, test_e2e, test_keepalive, test_largeobject, + test_pool, test_protocol, test_rowdata, test_sql, test_ssl, test_types, + test_pool_cluster diff --git a/tests/test_advisory_lock.nim b/tests/test_advisory_lock.nim new file mode 100644 index 0000000..80a5349 --- /dev/null +++ b/tests/test_advisory_lock.nim @@ -0,0 +1,375 @@ +import std/unittest + +import ../async_postgres/[async_backend, pg_connection, pg_client, pg_advisory_lock] + +const + PgHost = "127.0.0.1" + PgPort = 15432 + PgUser = "test" + PgPassword = "test" + PgDatabase = "test" + +proc plainConfig(): ConnConfig = + ConnConfig( + host: PgHost, + port: PgPort, + user: PgUser, + password: PgPassword, + database: PgDatabase, + sslMode: sslDisable, + ) + +suite "Advisory Lock: session-level exclusive (int64)": + test "lock and unlock": + proc t() {.async.} = + let conn = await connect(plainConfig()) + defer: + await conn.close() + await conn.advisoryLock(12345'i64) + let released = await conn.advisoryUnlock(12345'i64) + doAssert released + + waitFor t() + + test "tryLock succeeds when not held": + proc t() {.async.} = + let conn = await connect(plainConfig()) + defer: + await conn.close() + let acquired = await conn.advisoryTryLock(12346'i64) + doAssert acquired + discard await conn.advisoryUnlock(12346'i64) + + waitFor t() + + test "tryLock fails when held by another session": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + await conn1.advisoryLock(12347'i64) + let acquired = await conn2.advisoryTryLock(12347'i64) + doAssert not acquired + discard await conn1.advisoryUnlock(12347'i64) + + waitFor t() + + test "unlock returns false when not held": + proc t() {.async.} = + let conn = await connect(plainConfig()) + defer: + await conn.close() + let released = await conn.advisoryUnlock(99999'i64) + doAssert not released + + waitFor t() + +suite "Advisory Lock: session-level shared (int64)": + test "multiple sessions can hold shared lock": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + await conn1.advisoryLockShared(20001'i64) + let acquired = await conn2.advisoryTryLockShared(20001'i64) + doAssert acquired + discard await conn1.advisoryUnlockShared(20001'i64) + discard await conn2.advisoryUnlockShared(20001'i64) + + waitFor t() + + test "exclusive blocked by shared": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + await conn1.advisoryLockShared(20002'i64) + let acquired = await conn2.advisoryTryLock(20002'i64) + doAssert not acquired + discard await conn1.advisoryUnlockShared(20002'i64) + + waitFor t() + +suite "Advisory Lock: unlockAll": + test "releases all session locks": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + await conn1.advisoryLock(30001'i64) + await conn1.advisoryLock(30002'i64) + await conn1.advisoryUnlockAll() + let a1 = await conn2.advisoryTryLock(30001'i64) + let a2 = await conn2.advisoryTryLock(30002'i64) + doAssert a1 + doAssert a2 + discard await conn2.advisoryUnlock(30001'i64) + discard await conn2.advisoryUnlock(30002'i64) + + waitFor t() + +suite "Advisory Lock: transaction-level (int64)": + test "xact lock released on commit": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withTransaction: + await conn1.advisoryLockXact(40001'i64) + let held = await conn2.advisoryTryLock(40001'i64) + doAssert not held + # After transaction, lock is released + let acquired = await conn2.advisoryTryLock(40001'i64) + doAssert acquired + discard await conn2.advisoryUnlock(40001'i64) + + waitFor t() + + test "xact tryLock": + proc t() {.async.} = + let conn = await connect(plainConfig()) + defer: + await conn.close() + conn.withTransaction: + let acquired = await conn.advisoryTryLockXact(40002'i64) + doAssert acquired + + waitFor t() + + test "xact shared lock": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withTransaction: + await conn1.advisoryLockXactShared(40003'i64) + let acquired = await conn2.advisoryTryLockShared(40003'i64) + doAssert acquired + discard await conn2.advisoryUnlockShared(40003'i64) + + waitFor t() + +suite "Advisory Lock: two-key (int32, int32)": + test "lock and unlock": + proc t() {.async.} = + let conn = await connect(plainConfig()) + defer: + await conn.close() + await conn.advisoryLock(1'i32, 2'i32) + let released = await conn.advisoryUnlock(1'i32, 2'i32) + doAssert released + + waitFor t() + + test "tryLock blocked by another session": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + await conn1.advisoryLock(3'i32, 4'i32) + let acquired = await conn2.advisoryTryLock(3'i32, 4'i32) + doAssert not acquired + discard await conn1.advisoryUnlock(3'i32, 4'i32) + + waitFor t() + + test "shared two-key": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + await conn1.advisoryLockShared(5'i32, 6'i32) + let acquired = await conn2.advisoryTryLockShared(5'i32, 6'i32) + doAssert acquired + discard await conn1.advisoryUnlockShared(5'i32, 6'i32) + discard await conn2.advisoryUnlockShared(5'i32, 6'i32) + + waitFor t() + + test "xact two-key": + proc t() {.async.} = + let conn = await connect(plainConfig()) + defer: + await conn.close() + conn.withTransaction: + await conn.advisoryLockXact(7'i32, 8'i32) + let acquired = await conn.advisoryTryLockXact(7'i32, 8'i32) + doAssert acquired + + waitFor t() + + test "xact shared two-key": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withTransaction: + await conn1.advisoryLockXactShared(9'i32, 10'i32) + let acquired = await conn2.advisoryTryLockXactShared(9'i32, 10'i32) + doAssert acquired + + waitFor t() + +suite "Advisory Lock: withAdvisoryLock template": + test "withAdvisoryLock int64": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withAdvisoryLock(50001'i64): + let held = await conn2.advisoryTryLock(50001'i64) + doAssert not held + # After block, lock is released + let released = await conn2.advisoryTryLock(50001'i64) + doAssert released + discard await conn2.advisoryUnlock(50001'i64) + + waitFor t() + + test "withAdvisoryLock two-key": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withAdvisoryLock(11'i32, 12'i32): + let held = await conn2.advisoryTryLock(11'i32, 12'i32) + doAssert not held + let released = await conn2.advisoryTryLock(11'i32, 12'i32) + doAssert released + discard await conn2.advisoryUnlock(11'i32, 12'i32) + + waitFor t() + + test "withAdvisoryLockShared int64": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withAdvisoryLockShared(50002'i64): + let acquired = await conn2.advisoryTryLockShared(50002'i64) + doAssert acquired + discard await conn2.advisoryUnlockShared(50002'i64) + + waitFor t() + + test "withAdvisoryLockShared two-key": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withAdvisoryLockShared(13'i32, 14'i32): + let acquired = await conn2.advisoryTryLockShared(13'i32, 14'i32) + doAssert acquired + discard await conn2.advisoryUnlockShared(13'i32, 14'i32) + + waitFor t() + + test "withAdvisoryLock releases on exception": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + try: + conn1.withAdvisoryLock(50003'i64): + raise newException(CatchableError, "test error") + except CatchableError: + discard + # Lock should be released despite exception + let acquired = await conn2.advisoryTryLock(50003'i64) + doAssert acquired + discard await conn2.advisoryUnlock(50003'i64) + + waitFor t() + +suite "Advisory Lock: withAdvisoryLockXact template": + test "withAdvisoryLockXact int64": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withTransaction: + conn1.withAdvisoryLockXact(60001'i64): + let held = await conn2.advisoryTryLock(60001'i64) + doAssert not held + # After transaction, lock is released + let released = await conn2.advisoryTryLock(60001'i64) + doAssert released + discard await conn2.advisoryUnlock(60001'i64) + + waitFor t() + + test "withAdvisoryLockXact two-key": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withTransaction: + conn1.withAdvisoryLockXact(15'i32, 16'i32): + let held = await conn2.advisoryTryLock(15'i32, 16'i32) + doAssert not held + let released = await conn2.advisoryTryLock(15'i32, 16'i32) + doAssert released + discard await conn2.advisoryUnlock(15'i32, 16'i32) + + waitFor t() + + test "withAdvisoryLockXactShared int64": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withTransaction: + conn1.withAdvisoryLockXactShared(60002'i64): + let acquired = await conn2.advisoryTryLockShared(60002'i64) + doAssert acquired + discard await conn2.advisoryUnlockShared(60002'i64) + + waitFor t() + + test "withAdvisoryLockXactShared two-key": + proc t() {.async.} = + let conn1 = await connect(plainConfig()) + let conn2 = await connect(plainConfig()) + defer: + await conn1.close() + await conn2.close() + conn1.withTransaction: + conn1.withAdvisoryLockXactShared(17'i32, 18'i32): + let acquired = await conn2.advisoryTryLockXactShared(17'i32, 18'i32) + doAssert acquired + + waitFor t()