From dbbe6b609b9a8f76495c0c381a8d19ed56aadb33 Mon Sep 17 00:00:00 2001 From: fox0430 Date: Mon, 20 Apr 2026 19:45:48 +0900 Subject: [PATCH] Add tests for portal/txn recovery on COPY failure --- tests/test_e2e.nim | 254 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 254 insertions(+) diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index 7b534bf..41920a6 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -4322,6 +4322,260 @@ suite "E2E: COPY IN Stream": waitFor t() +suite "E2E: COPY Failure Recovery": + test "copyIn bad data inside txn: rollback recovers and next query works": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_copy_fail_txn") + discard await conn.exec("CREATE TABLE test_copy_fail_txn (id int, name text)") + + discard await conn.exec("BEGIN") + doAssert conn.txStatus == tsInTransaction + + var raised = false + try: + # "abc" is not a valid int4 -> server raises invalid_text_representation + discard await conn.copyIn( + "COPY test_copy_fail_txn FROM STDIN", @["abc\tAlice\n".toBytes()] + ) + except PgError: + raised = true + doAssert raised + doAssert conn.state == csReady + doAssert conn.txStatus == tsInFailedTransaction + + # Any query in a failed txn errors until ROLLBACK + var stillFailed = false + try: + discard await conn.query("SELECT 1") + except PgError: + stillFailed = true + doAssert stillFailed + doAssert conn.txStatus == tsInFailedTransaction + + discard await conn.exec("ROLLBACK") + doAssert conn.txStatus == tsIdle + + let res = await conn.query("SELECT 1") + doAssert res.rows[0].getStr(0) == "1" + + let cnt = await conn.query("SELECT count(*) FROM test_copy_fail_txn") + doAssert cnt.rows[0].getStr(0) == "0" + + discard await conn.exec("DROP TABLE test_copy_fail_txn") + await conn.close() + + waitFor t() + + test "copyIn invalid SQL inside txn: rollback recovers": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("BEGIN") + doAssert conn.txStatus == tsInTransaction + + var raised = false + try: + discard await conn.copyIn( + "COPY nonexistent_table_xyz FROM STDIN", @["1\ttest\n".toBytes()] + ) + except PgError: + raised = true + doAssert raised + doAssert conn.state == csReady + doAssert conn.txStatus == tsInFailedTransaction + + discard await conn.exec("ROLLBACK") + doAssert conn.txStatus == tsIdle + + let res = await conn.query("SELECT 42") + doAssert res.rows[0].getStr(0) == "42" + + await conn.close() + + waitFor t() + + test "copyInStream callback error inside txn: rollback recovers": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_copy_fail_stream_txn") + discard + await conn.exec("CREATE TABLE test_copy_fail_stream_txn (id int, name text)") + + discard await conn.exec("BEGIN") + doAssert conn.txStatus == tsInTransaction + discard + await conn.exec("INSERT INTO test_copy_fail_stream_txn VALUES (1, 'pre-copy')") + + var callCount = 0 + let cb = makeCopyInCallback: + inc callCount + if callCount == 1: + "1\tfirst\n".toBytes() + else: + raise newException(CatchableError, "stream aborted") + + var raised = false + try: + discard await conn.copyInStream("COPY test_copy_fail_stream_txn FROM STDIN", cb) + except CatchableError as e: + raised = true + doAssert "stream aborted" in e.msg + doAssert raised + doAssert conn.state == csReady + # CopyFail inside txn aborts the transaction + doAssert conn.txStatus == tsInFailedTransaction + + discard await conn.exec("ROLLBACK") + doAssert conn.txStatus == tsIdle + + # Table was created + pre-copy row inserted, both rolled back + let cnt = await conn.query("SELECT count(*) FROM test_copy_fail_stream_txn") + doAssert cnt.rows[0].getStr(0) == "0" + + discard await conn.exec("DROP TABLE test_copy_fail_stream_txn") + await conn.close() + + waitFor t() + + test "copyOut invalid SQL inside txn: rollback recovers": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("BEGIN") + doAssert conn.txStatus == tsInTransaction + + var raised = false + try: + discard await conn.copyOut("COPY nonexistent_table_xyz TO STDOUT") + except PgError: + raised = true + doAssert raised + doAssert conn.state == csReady + doAssert conn.txStatus == tsInFailedTransaction + + discard await conn.exec("ROLLBACK") + doAssert conn.txStatus == tsIdle + + let res = await conn.query("SELECT 7") + doAssert res.rows[0].getStr(0) == "7" + + await conn.close() + + waitFor t() + + test "copyOutStream callback error inside txn: rollback recovers": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_copy_out_fail_txn") + discard await conn.exec("CREATE TABLE test_copy_out_fail_txn (id int)") + discard await conn.exec( + "INSERT INTO test_copy_out_fail_txn SELECT g FROM generate_series(1, 200) AS g" + ) + + discard await conn.exec("BEGIN") + doAssert conn.txStatus == tsInTransaction + + var chunkCount = 0 + let failingCb = makeCopyOutCallback: + inc chunkCount + raise newException(CatchableError, "out callback failed") + + var raised = false + try: + discard + await conn.copyOutStream("COPY test_copy_out_fail_txn TO STDOUT", failingCb) + except CatchableError as e: + raised = true + doAssert "out callback failed" in e.msg + doAssert raised + doAssert chunkCount >= 1 + doAssert conn.state == csReady + # COPY OUT has no client->server abort; server completes normally so tx + # remains in-transaction even though the client callback failed. + doAssert conn.txStatus == tsInTransaction + + discard await conn.exec("ROLLBACK") + doAssert conn.txStatus == tsIdle + + let res = await conn.query("SELECT 1") + doAssert res.rows[0].getStr(0) == "1" + + discard await conn.exec("DROP TABLE test_copy_out_fail_txn") + await conn.close() + + waitFor t() + + test "cursor works after copyIn failure (portal state clean)": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_copy_portal") + discard await conn.exec("CREATE TABLE test_copy_portal (id int)") + discard await conn.exec( + "INSERT INTO test_copy_portal SELECT g FROM generate_series(1, 30) AS g" + ) + + var raised = false + try: + discard await conn.copyIn( + "COPY test_copy_portal FROM STDIN", @["not_an_int\n".toBytes()] + ) + except PgError: + raised = true + doAssert raised + doAssert conn.state == csReady + doAssert conn.txStatus == tsIdle + + # Subsequent portal-based cursor op must succeed + let cursor = await conn.openCursor( + "SELECT id FROM test_copy_portal ORDER BY id", chunkSize = 10 + ) + var total = 0 + while true: + let chunk = await cursor.fetchNext() + if chunk.len == 0: + break + total += chunk.len + doAssert total == 30 + doAssert conn.state == csReady + + discard await conn.exec("DROP TABLE test_copy_portal") + await conn.close() + + waitFor t() + + test "cursor works after copyOut failure (portal state clean)": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_copy_out_portal") + discard await conn.exec("CREATE TABLE test_copy_out_portal (id int)") + discard await conn.exec( + "INSERT INTO test_copy_out_portal SELECT g FROM generate_series(1, 15) AS g" + ) + + var raised = false + try: + discard await conn.copyOut("COPY nonexistent_portal_tbl TO STDOUT") + except PgError: + raised = true + doAssert raised + doAssert conn.state == csReady + doAssert conn.txStatus == tsIdle + + let cursor = await conn.openCursor( + "SELECT id FROM test_copy_out_portal ORDER BY id", chunkSize = 5 + ) + var total = 0 + while true: + let chunk = await cursor.fetchNext() + if chunk.len == 0: + break + total += chunk.len + doAssert total == 15 + + discard await conn.exec("DROP TABLE test_copy_out_portal") + await conn.close() + + waitFor t() + suite "E2E: COPY IN openArray[byte]": test "copyIn with openArray[byte] inserts rows": proc t() {.async.} =