Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions async_postgres/pg_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2564,8 +2564,7 @@ proc executeImpl(
# chronos drains the send Future in the background while we descend into
# the receive loop. The outer try/except below owns sendFut's lifetime:
# it drains sendFut on the normal path (propagating any stored write
# error) and cancels it on any abnormal exit so the Future never leaks
# and `conn.sendBuf` is never mutated while a write still borrows it.
# error) and cancels it on any abnormal exit so the Future never leaks.
var sendFut = conn.sendBufMsg()
else:
await conn.sendBufMsg()
Expand Down Expand Up @@ -2692,14 +2691,12 @@ proc executeImpl(
await conn.fillRecvBuf(timeout)

when hasChronos:
# Normal path: drain sendFut to propagate any stored write error and
# release the borrow on conn.sendBuf.
# Normal path: drain sendFut to propagate any stored write error.
await sendFut
except CatchableError as e:
when hasChronos:
# Abnormal path (recv error, cancellation, failed stored write):
# ensure sendFut never escapes as an unhandled Future and that no
# in-flight write still holds addr conn.sendBuf[0].
# ensure sendFut never escapes as an unhandled Future.
if not sendFut.finished:
try:
await cancelAndWait(sendFut)
Expand Down Expand Up @@ -2945,8 +2942,7 @@ proc executeIsolatedImpl(
await sendFut
except CatchableError as e:
when hasChronos:
# Abnormal path: cancel or drain sendFut so the Future never leaks
# and no in-flight write still borrows conn.sendBuf.
# Abnormal path: cancel or drain sendFut so the Future never leaks.
if not sendFut.finished:
try:
await cancelAndWait(sendFut)
Expand Down
9 changes: 5 additions & 4 deletions async_postgres/pg_connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -905,14 +905,15 @@ proc sendMsg*(conn: PgConnection, data: seq[byte]): Future[void] {.async.} =
await conn.socket.sendRawBytes(data)

proc sendBufMsg*(conn: PgConnection): Future[void] {.async.} =
## Send conn.sendBuf to the server without copying the seq.
## Safe because conn.state == csBusy prevents concurrent access to sendBuf.
## Send conn.sendBuf to the server.
## The transport receives its own copy of the buffer, so conn.sendBuf is safe
## to mutate while the returned Future is still pending.
when hasChronos:
if conn.sendBuf.len > 0:
await conn.writer.write(addr conn.sendBuf[0], conn.sendBuf.len)
await conn.writer.write(conn.sendBuf)
elif hasAsyncDispatch:
if conn.sendBuf.len > 0:
await conn.socket.sendRawData(addr conn.sendBuf[0], conn.sendBuf.len)
await conn.socket.sendRawBytes(conn.sendBuf)

proc closeTransport(conn: PgConnection) {.async.} =
## Close transport resources without sending Terminate.
Expand Down