Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions async_postgres/pg_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func hstoreOid*(conn: PgConnection): int32 {.inline.} =
conn.hstoreOid

func hstoreArrayOid*(conn: PgConnection): int32 {.inline.} =
## Dynamic OID for hstore[] array type; 0 if not available.
## Dynamic OID for ``hstore[]`` array type; 0 if not available.
conn.hstoreArrayOid

proc toPgBinaryParam*(conn: PgConnection, v: PgHstore): PgParam {.inline.} =
Expand All @@ -408,9 +408,9 @@ proc toPgBinaryParam*(conn: PgConnection, v: PgHstore): PgParam {.inline.} =
toPgBinaryParam(v, conn.hstoreOid)

proc toPgBinaryParam*(conn: PgConnection, v: seq[PgHstore]): PgParam {.inline.} =
## Convenience overload: encode hstore[] in binary using ``conn.hstoreOid``
## and ``conn.hstoreArrayOid``. Raises ``PgTypeError`` if either OID has not
## been discovered.
## Convenience overload: encode ``hstore[]`` in binary using
## ``conn.hstoreOid`` and ``conn.hstoreArrayOid``. Raises ``PgTypeError`` if
## either OID has not been discovered.
if conn.hstoreOid == 0 or conn.hstoreArrayOid == 0:
raise
newException(PgTypeError, "hstore/hstore[] OIDs not available on this connection")
Expand Down Expand Up @@ -1336,8 +1336,14 @@ proc connectToHost(
else:
await sock.connect(hostAddr, Port(hostPort))
when defined(posix):
configureTcpNoDelay(posix.SocketHandle(sock.getFd()))
configureKeepalive(posix.SocketHandle(sock.getFd()), config)
when defined(nimdoc):
# nim doc resolves nativesockets.SocketHandle to winlean on some
# setups, so cast explicitly to satisfy the doc-time type check.
configureTcpNoDelay(posix.SocketHandle(sock.getFd()))
configureKeepalive(posix.SocketHandle(sock.getFd()), config)
else:
configureTcpNoDelay(sock.getFd())
configureKeepalive(sock.getFd(), config)
except CatchableError:
sock.close()
raise
Expand Down
15 changes: 8 additions & 7 deletions async_postgres/pg_types/encoding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1035,11 +1035,12 @@ proc toPgBinaryParam*(v: PgHstore, oid: int32): PgParam =
PgParam(oid: oid, format: 1, value: some(encodeHstoreBinary(v)))

proc toPgParam*(v: seq[PgHstore]): PgParam =
## Send hstore[] in text format using ``OidTextArray``. Requires an explicit
## ``::hstore[]`` cast in the SQL statement (e.g. ``SELECT $1::hstore[]``),
## since the parameter is typed as text[]. No connection-specific OID is
## needed; prefer ``toPgBinaryParam`` when a ``PgConnection`` with the
## discovered hstore OIDs is available (faster, no cast required).
## Send ``hstore[]`` in text format using ``OidTextArray``. Requires an
## explicit ``::hstore[]`` cast in the SQL statement (e.g.
## ``SELECT $1::hstore[]``), since the parameter is typed as ``text[]``. No
## connection-specific OID is needed; prefer ``toPgBinaryParam`` when a
## ``PgConnection`` with the discovered hstore OIDs is available (faster, no
## cast required).
if v.len == 0:
return PgParam(oid: OidTextArray, format: 0, value: some(toBytes("{}")))
var s = "{"
Expand All @@ -1056,8 +1057,8 @@ proc toPgParam*(v: seq[PgHstore]): PgParam =
PgParam(oid: OidTextArray, format: 0, value: some(toBytes(s)))

proc toPgBinaryParam*(v: seq[PgHstore], elemOid: int32, arrayOid: int32): PgParam =
## Encode hstore[] in binary format. Requires both the dynamic hstore OID
## and hstore[] OID (available as ``conn.hstoreOid`` and
## Encode ``hstore[]`` in binary format. Requires both the dynamic hstore OID
## and ``hstore[]`` OID (available as ``conn.hstoreOid`` and
## ``conn.hstoreArrayOid`` after connection). See also the ``PgConnection``
## overload in ``pg_connection`` which reads these OIDs automatically.
if v.len == 0:
Expand Down
2 changes: 1 addition & 1 deletion async_postgres/pg_types/user_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ proc getEnumOpt*[T: enum](row: Row, col: int): Option[T] =
some(getEnum[T](row, col))

proc getEnumArray*[T: enum](row: Row, col: int): seq[T] =
## Read a PostgreSQL enum[] column as ``seq[T]``.
## Read a PostgreSQL ``enum[]`` column as ``seq[T]``.
## Raises ``PgTypeError`` on NULL column or NULL element.
if row.isBinaryCol(col):
let (off, clen) = cellInfo(row, col)
Expand Down
2 changes: 2 additions & 0 deletions tests/all_tests.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{.push warning[UnusedImport]: off.}
import
test_advisory_lock, test_auth, test_dsn, test_e2e, test_keepalive, test_largeobject,
test_pool, test_protocol, test_rowdata, test_sql, test_ssl, test_tracing, test_types,
test_pool_cluster
{.pop.}
85 changes: 48 additions & 37 deletions tests/test_e2e.nim
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import
std/[
unittest, options, strutils, tables, os, math, deques, sets, importutils, net, json
]
std/[unittest, options, strutils, tables, os, math, deques, importutils, net, json]
from std/times import
DateTime, dateTime, mMar, mJun, mJan, mDec, utc, year, month, monthday, hour, minute,
second, toTime, toUnix, nanosecond

import ../async_postgres/[async_backend, pg_protocol, pg_types, pg_replication]

when hasChronos:
import std/sets

import ../async_postgres/pg_client {.all.}
import ../async_postgres/pg_pool {.all.}
import ../async_postgres/pg_connection {.all.}
Expand Down Expand Up @@ -820,12 +821,14 @@ suite "E2E: Transaction":
await conn.exec("CREATE TABLE test_tx_rb (id serial PRIMARY KEY, val text)")

var raised = false
var shouldRaise = true
try:
conn.withTransaction:
discard await conn.exec(
"INSERT INTO test_tx_rb (val) VALUES ($1)", @[toPgParam("rollback_me")]
)
raise newException(ValueError, "intentional error")
if shouldRaise:
raise newException(ValueError, "intentional error")
except ValueError:
raised = true

Expand Down Expand Up @@ -868,12 +871,14 @@ suite "E2E: Transaction":
await pool.exec("CREATE TABLE test_ptx_rb (id serial PRIMARY KEY, val text)")

var raised = false
var shouldRaise = true
try:
pool.withTransaction(conn):
discard await conn.exec(
"INSERT INTO test_ptx_rb (val) VALUES ($1)", @[toPgParam("pool_rollback")]
)
raise newException(ValueError, "intentional error")
if shouldRaise:
raise newException(ValueError, "intentional error")
except ValueError:
raised = true

Expand All @@ -892,6 +897,7 @@ suite "E2E: Transaction":
let killer = await connect(plainConfig())

var raised = false
var shouldRaise = true
try:
conn.withTransaction:
let pidRes = await conn.query("SELECT pg_backend_pid()")
Expand All @@ -902,7 +908,8 @@ suite "E2E: Transaction":
)
# Give the server a moment to terminate the backend
await sleepAsync(milliseconds(100))
raise newException(ValueError, "original error")
if shouldRaise:
raise newException(ValueError, "original error")
except ValueError as e:
raised = true
doAssert e.msg == "original error"
Expand All @@ -919,6 +926,7 @@ suite "E2E: Transaction":
let killer = await connect(plainConfig())

var raised = false
var shouldRaise = true
try:
pool.withTransaction(conn):
let pidRes = await conn.query("SELECT pg_backend_pid()")
Expand All @@ -927,7 +935,8 @@ suite "E2E: Transaction":
"SELECT pg_terminate_backend($1)", @[toPgParam(parseInt(pid).int32)]
)
await sleepAsync(milliseconds(100))
raise newException(ValueError, "original error")
if shouldRaise:
raise newException(ValueError, "original error")
except ValueError as e:
raised = true
doAssert e.msg == "original error"
Expand Down Expand Up @@ -1188,6 +1197,7 @@ suite "E2E: Transaction":
discard
await conn.exec("CREATE TABLE test_sp_rb (id serial PRIMARY KEY, val text)")

var shouldRaise = true
conn.withTransaction:
discard await conn.exec(
"INSERT INTO test_sp_rb (val) VALUES ($1)", @[toPgParam("before")]
Expand All @@ -1197,7 +1207,8 @@ suite "E2E: Transaction":
discard await conn.exec(
"INSERT INTO test_sp_rb (val) VALUES ($1)", @[toPgParam("inner")]
)
raise newException(ValueError, "savepoint error")
if shouldRaise:
raise newException(ValueError, "savepoint error")
except ValueError:
discard

Expand All @@ -1217,6 +1228,7 @@ suite "E2E: Transaction":
discard
await conn.exec("CREATE TABLE test_sp_nest (id serial PRIMARY KEY, val text)")

var shouldRaise = true
conn.withTransaction:
conn.withSavepoint:
discard await conn.exec(
Expand All @@ -1227,7 +1239,8 @@ suite "E2E: Transaction":
discard await conn.exec(
"INSERT INTO test_sp_nest (val) VALUES ($1)", @[toPgParam("inner")]
)
raise newException(ValueError, "inner error")
if shouldRaise:
raise newException(ValueError, "inner error")
except ValueError:
discard

Expand Down Expand Up @@ -1532,14 +1545,14 @@ suite "E2E: COPY Protocol":
discard
await conn.exec("INSERT INTO test_copy_out VALUES (1, 'Alice'), (2, 'Bob')")

let result = await conn.copyOut("COPY test_copy_out TO STDOUT")
doAssert result.format == cfText
doAssert "COPY 2" in result.commandTag
doAssert result.data.len == 2
let r = await conn.copyOut("COPY test_copy_out TO STDOUT")
doAssert r.format == cfText
doAssert "COPY 2" in r.commandTag
doAssert r.data.len == 2

# Each row is a tab-delimited line with trailing newline
doAssert result.data[0].toString() == "1\tAlice\n"
doAssert result.data[1].toString() == "2\tBob\n"
doAssert r.data[0].toString() == "1\tAlice\n"
doAssert r.data[1].toString() == "2\tBob\n"

discard await conn.exec("DROP TABLE test_copy_out")
await conn.close()
Expand Down Expand Up @@ -1599,9 +1612,9 @@ suite "E2E: COPY Protocol":
discard await conn.exec("DROP TABLE IF EXISTS test_copy_out_empty")
discard await conn.exec("CREATE TABLE test_copy_out_empty (id int, name text)")

let result = await conn.copyOut("COPY test_copy_out_empty TO STDOUT")
doAssert result.data.len == 0
doAssert "COPY 0" in result.commandTag
let r = await conn.copyOut("COPY test_copy_out_empty TO STDOUT")
doAssert r.data.len == 0
doAssert "COPY 0" in r.commandTag
doAssert conn.state == csReady

discard await conn.exec("DROP TABLE test_copy_out_empty")
Expand Down Expand Up @@ -1638,9 +1651,9 @@ suite "E2E: COPY Protocol":
"INSERT INTO test_copy_out_large SELECT g FROM generate_series(1, 10000) AS g"
)

let result = await conn.copyOut("COPY test_copy_out_large TO STDOUT")
doAssert result.data.len == 10000
doAssert "COPY 10000" in result.commandTag
let r = await conn.copyOut("COPY test_copy_out_large TO STDOUT")
doAssert r.data.len == 10000
doAssert "COPY 10000" in r.commandTag

discard await conn.exec("DROP TABLE test_copy_out_large")
await conn.close()
Expand Down Expand Up @@ -1682,11 +1695,11 @@ suite "E2E: COPY Protocol":
"INSERT INTO test_copy_out_null VALUES (1, NULL), (NULL, 'Bob')"
)

let result = await conn.copyOut("COPY test_copy_out_null TO STDOUT")
doAssert result.data.len == 2
let r = await conn.copyOut("COPY test_copy_out_null TO STDOUT")
doAssert r.data.len == 2
# NULL is represented as \N in text format
doAssert result.data[0].toString() == "1\t\\N\n"
doAssert result.data[1].toString() == "\\N\tBob\n"
doAssert r.data[0].toString() == "1\t\\N\n"
doAssert r.data[1].toString() == "\\N\tBob\n"

discard await conn.exec("DROP TABLE test_copy_out_null")
await conn.close()
Expand Down Expand Up @@ -1754,11 +1767,10 @@ suite "E2E: COPY Protocol":
"INSERT INTO test_copy_csv_out VALUES (1, 'Alice'), (2, 'Bob, Jr.')"
)

let result =
await conn.copyOut("COPY test_copy_csv_out TO STDOUT WITH (FORMAT csv)")
doAssert result.data.len == 2
doAssert result.data[0].toString() == "1,Alice\n"
doAssert result.data[1].toString() == "2,\"Bob, Jr.\"\n"
let r = await conn.copyOut("COPY test_copy_csv_out TO STDOUT WITH (FORMAT csv)")
doAssert r.data.len == 2
doAssert r.data[0].toString() == "1,Alice\n"
doAssert r.data[1].toString() == "2,\"Bob, Jr.\"\n"

discard await conn.exec("DROP TABLE test_copy_csv_out")
await conn.close()
Expand Down Expand Up @@ -3527,12 +3539,12 @@ suite "E2E: Binary Format":
let params = @[toPgBinaryParam(dt)]
let qr = await conn.query("SELECT $1::timestamp", params, resultFormat = rfBinary)
doAssert qr.rows.len == 1
let result = qr.rows[0].getTimestamp(0)
doAssert result.year == 2024
doAssert result.month == mJan
doAssert result.monthday == 15
doAssert result.hour == 10
doAssert result.minute == 30
let r = qr.rows[0].getTimestamp(0)
doAssert r.year == 2024
doAssert r.month == mJan
doAssert r.monthday == 15
doAssert r.hour == 10
doAssert r.minute == 30
await conn.close()

waitFor t()
Expand Down Expand Up @@ -4232,7 +4244,6 @@ suite "E2E: COPY IN Stream":
"1\n".toBytes()
else:
raise newException(CatchableError, "callback failed")
newSeq[byte]()

var raised = false
try:
Expand Down
Loading