Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 254 additions & 0 deletions tests/test_e2e.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4322,6 +4322,260 @@ suite "E2E: COPY IN Stream":

waitFor t()

suite "E2E: COPY Failure Recovery":
test "copyIn bad data inside txn: rollback recovers and next query works":
proc t() {.async.} =
let conn = await connect(plainConfig())
discard await conn.exec("DROP TABLE IF EXISTS test_copy_fail_txn")
discard await conn.exec("CREATE TABLE test_copy_fail_txn (id int, name text)")

discard await conn.exec("BEGIN")
doAssert conn.txStatus == tsInTransaction

var raised = false
try:
# "abc" is not a valid int4 -> server raises invalid_text_representation
discard await conn.copyIn(
"COPY test_copy_fail_txn FROM STDIN", @["abc\tAlice\n".toBytes()]
)
except PgError:
raised = true
doAssert raised
doAssert conn.state == csReady
doAssert conn.txStatus == tsInFailedTransaction

# Any query in a failed txn errors until ROLLBACK
var stillFailed = false
try:
discard await conn.query("SELECT 1")
except PgError:
stillFailed = true
doAssert stillFailed
doAssert conn.txStatus == tsInFailedTransaction

discard await conn.exec("ROLLBACK")
doAssert conn.txStatus == tsIdle

let res = await conn.query("SELECT 1")
doAssert res.rows[0].getStr(0) == "1"

let cnt = await conn.query("SELECT count(*) FROM test_copy_fail_txn")
doAssert cnt.rows[0].getStr(0) == "0"

discard await conn.exec("DROP TABLE test_copy_fail_txn")
await conn.close()

waitFor t()

test "copyIn invalid SQL inside txn: rollback recovers":
proc t() {.async.} =
let conn = await connect(plainConfig())
discard await conn.exec("BEGIN")
doAssert conn.txStatus == tsInTransaction

var raised = false
try:
discard await conn.copyIn(
"COPY nonexistent_table_xyz FROM STDIN", @["1\ttest\n".toBytes()]
)
except PgError:
raised = true
doAssert raised
doAssert conn.state == csReady
doAssert conn.txStatus == tsInFailedTransaction

discard await conn.exec("ROLLBACK")
doAssert conn.txStatus == tsIdle

let res = await conn.query("SELECT 42")
doAssert res.rows[0].getStr(0) == "42"

await conn.close()

waitFor t()

test "copyInStream callback error inside txn: rollback recovers":
proc t() {.async.} =
let conn = await connect(plainConfig())
discard await conn.exec("DROP TABLE IF EXISTS test_copy_fail_stream_txn")
discard
await conn.exec("CREATE TABLE test_copy_fail_stream_txn (id int, name text)")

discard await conn.exec("BEGIN")
doAssert conn.txStatus == tsInTransaction
discard
await conn.exec("INSERT INTO test_copy_fail_stream_txn VALUES (1, 'pre-copy')")

var callCount = 0
let cb = makeCopyInCallback:
inc callCount
if callCount == 1:
"1\tfirst\n".toBytes()
else:
raise newException(CatchableError, "stream aborted")

var raised = false
try:
discard await conn.copyInStream("COPY test_copy_fail_stream_txn FROM STDIN", cb)
except CatchableError as e:
raised = true
doAssert "stream aborted" in e.msg
doAssert raised
doAssert conn.state == csReady
# CopyFail inside txn aborts the transaction
doAssert conn.txStatus == tsInFailedTransaction

discard await conn.exec("ROLLBACK")
doAssert conn.txStatus == tsIdle

# Table was created + pre-copy row inserted, both rolled back
let cnt = await conn.query("SELECT count(*) FROM test_copy_fail_stream_txn")
doAssert cnt.rows[0].getStr(0) == "0"

discard await conn.exec("DROP TABLE test_copy_fail_stream_txn")
await conn.close()

waitFor t()

test "copyOut invalid SQL inside txn: rollback recovers":
proc t() {.async.} =
let conn = await connect(plainConfig())
discard await conn.exec("BEGIN")
doAssert conn.txStatus == tsInTransaction

var raised = false
try:
discard await conn.copyOut("COPY nonexistent_table_xyz TO STDOUT")
except PgError:
raised = true
doAssert raised
doAssert conn.state == csReady
doAssert conn.txStatus == tsInFailedTransaction

discard await conn.exec("ROLLBACK")
doAssert conn.txStatus == tsIdle

let res = await conn.query("SELECT 7")
doAssert res.rows[0].getStr(0) == "7"

await conn.close()

waitFor t()

test "copyOutStream callback error inside txn: rollback recovers":
proc t() {.async.} =
let conn = await connect(plainConfig())
discard await conn.exec("DROP TABLE IF EXISTS test_copy_out_fail_txn")
discard await conn.exec("CREATE TABLE test_copy_out_fail_txn (id int)")
discard await conn.exec(
"INSERT INTO test_copy_out_fail_txn SELECT g FROM generate_series(1, 200) AS g"
)

discard await conn.exec("BEGIN")
doAssert conn.txStatus == tsInTransaction

var chunkCount = 0
let failingCb = makeCopyOutCallback:
inc chunkCount
raise newException(CatchableError, "out callback failed")

var raised = false
try:
discard
await conn.copyOutStream("COPY test_copy_out_fail_txn TO STDOUT", failingCb)
except CatchableError as e:
raised = true
doAssert "out callback failed" in e.msg
doAssert raised
doAssert chunkCount >= 1
doAssert conn.state == csReady
# COPY OUT has no client->server abort; server completes normally so tx
# remains in-transaction even though the client callback failed.
doAssert conn.txStatus == tsInTransaction

discard await conn.exec("ROLLBACK")
doAssert conn.txStatus == tsIdle

let res = await conn.query("SELECT 1")
doAssert res.rows[0].getStr(0) == "1"

discard await conn.exec("DROP TABLE test_copy_out_fail_txn")
await conn.close()

waitFor t()

test "cursor works after copyIn failure (portal state clean)":
proc t() {.async.} =
let conn = await connect(plainConfig())
discard await conn.exec("DROP TABLE IF EXISTS test_copy_portal")
discard await conn.exec("CREATE TABLE test_copy_portal (id int)")
discard await conn.exec(
"INSERT INTO test_copy_portal SELECT g FROM generate_series(1, 30) AS g"
)

var raised = false
try:
discard await conn.copyIn(
"COPY test_copy_portal FROM STDIN", @["not_an_int\n".toBytes()]
)
except PgError:
raised = true
doAssert raised
doAssert conn.state == csReady
doAssert conn.txStatus == tsIdle

# Subsequent portal-based cursor op must succeed
let cursor = await conn.openCursor(
"SELECT id FROM test_copy_portal ORDER BY id", chunkSize = 10
)
var total = 0
while true:
let chunk = await cursor.fetchNext()
if chunk.len == 0:
break
total += chunk.len
doAssert total == 30
doAssert conn.state == csReady

discard await conn.exec("DROP TABLE test_copy_portal")
await conn.close()

waitFor t()

test "cursor works after copyOut failure (portal state clean)":
proc t() {.async.} =
let conn = await connect(plainConfig())
discard await conn.exec("DROP TABLE IF EXISTS test_copy_out_portal")
discard await conn.exec("CREATE TABLE test_copy_out_portal (id int)")
discard await conn.exec(
"INSERT INTO test_copy_out_portal SELECT g FROM generate_series(1, 15) AS g"
)

var raised = false
try:
discard await conn.copyOut("COPY nonexistent_portal_tbl TO STDOUT")
except PgError:
raised = true
doAssert raised
doAssert conn.state == csReady
doAssert conn.txStatus == tsIdle

let cursor = await conn.openCursor(
"SELECT id FROM test_copy_out_portal ORDER BY id", chunkSize = 5
)
var total = 0
while true:
let chunk = await cursor.fetchNext()
if chunk.len == 0:
break
total += chunk.len
doAssert total == 15

discard await conn.exec("DROP TABLE test_copy_out_portal")
await conn.close()

waitFor t()

suite "E2E: COPY IN openArray[byte]":
test "copyIn with openArray[byte] inserts rows":
proc t() {.async.} =
Expand Down
Loading