diff --git a/async_postgres/pg_client.nim b/async_postgres/pg_client.nim index f8403e3..75687d8 100644 --- a/async_postgres/pg_client.nim +++ b/async_postgres/pg_client.nim @@ -66,10 +66,21 @@ type PipelineOp = object kind: PipelineOpKind sql: string + # Legacy path — populated by the `seq[PgParam]` overloads. These seqs own + # the per-parameter byte payloads directly, avoiding an extra copy into + # Pipeline-level storage for bulk-string workloads. params: seq[Option[seq[byte]]] paramOids: seq[int32] paramFormats: seq[int16] resultFormats: seq[int16] + # Inline path — populated by the `openArray[PgParamInline]` overloads. + # Points at slices of the Pipeline-level SoA buffers + # (`inlineRanges`/`inlineOids`/`inlineFormats`/`inlineData`). `hasInline` + # true means the send phase should use these slices instead of the legacy + # fields above. + hasInline: bool + inlineStart: int32 + inlineCount: int32 # Set during send phase cacheHit: bool cacheMiss: bool @@ -91,6 +102,13 @@ type ## Batch of queries/execs sent through the PostgreSQL pipeline protocol. conn: PgConnection ops: seq[PipelineOp] + # SoA storage shared by all ops added via the `PgParamInline` path. + # Index ranges in each op point into these sequences, eliminating per-op + # parameter allocations. + inlineData: seq[byte] + inlineRanges: seq[tuple[off: int32, len: int32]] + inlineOids: seq[int32] + inlineFormats: seq[int16] IsolatedPipelineResults* = object ## Results from `executeIsolated`: per-op error isolation via per-query SYNC. @@ -370,6 +388,172 @@ proc exec*( tag = await execImpl(conn, sql, params) return initCommandResult(tag) +template appendInlineParam( + data: var seq[byte], + ranges: var seq[tuple[off: int32, len: int32]], + oids: var seq[int32], + formats: var seq[int16], + p: PgParamInline, +) = + ## Shared encoder for a single `PgParamInline` into SoA buffers. Used by + ## both `flattenInline` (per-call temporaries) and `Pipeline.appendInline` + ## (pipeline-wide SoA). Keeping the NULL / empty / inline / overflow + ## branching in one place means wire-format semantics cannot drift between + ## the two code paths. + oids.add p.oid + formats.add p.format + if p.len == -1: + ranges.add((int32(0), int32(-1))) + elif p.len == 0: + ranges.add((int32(data.len), int32(0))) + else: + let dataOff = int32(data.len) + let oldLen = data.len + data.setLen(oldLen + int(p.len)) + if p.len <= PgInlineBufSize: + copyMem(addr data[oldLen], unsafeAddr p.inlineBuf[0], int(p.len)) + else: + copyMem(addr data[oldLen], unsafeAddr p.overflow[0], int(p.len)) + ranges.add((dataOff, p.len)) + +proc flattenInline( + params: openArray[PgParamInline] +): tuple[ + data: seq[byte], + ranges: seq[tuple[off: int32, len: int32]], + oids: seq[int32], + formats: seq[int16], +] = + if params.len == 0: + return + result.oids = newSeqOfCap[int32](params.len) + result.formats = newSeqOfCap[int16](params.len) + result.ranges = newSeqOfCap[tuple[off: int32, len: int32]](params.len) + var estBytes = 0 + for p in params: + if p.len > 0: + estBytes += int(p.len) + result.data = newSeqOfCap[byte](estBytes) + for p in params: + appendInlineParam(result.data, result.ranges, result.oids, result.formats, p) + +proc execInlineImpl( + conn: PgConnection, + sql: string, + data: seq[byte], + ranges: seq[tuple[off: int32, len: int32]], + paramOids: seq[int32], + paramFormats: seq[int16], + timeout: Duration = ZeroDuration, +): Future[string] {.async.} = + conn.checkReady() + conn.state = csBusy + + let cached = conn.lookupStmtCache(sql) + var cacheHit = cached != nil + var cacheMiss = false + var stmtName = "" + + conn.sendBuf.setLen(0) + if cacheHit: + stmtName = cached.name + conn.sendBuf.addBindRaw("", stmtName, paramFormats, data, ranges) + conn.sendBuf.addExecute("", 0) + conn.sendBuf.addSync() + await conn.sendBufMsg() + elif conn.stmtCacheCapacity > 0: + cacheMiss = true + stmtName = conn.nextStmtName() + if conn.stmtCache.len >= conn.stmtCacheCapacity: + let evicted = conn.evictStmtCache() + conn.sendBuf.addClose(dkStatement, evicted.name) + conn.sendBuf.addParse(stmtName, sql, paramOids) + conn.sendBuf.addDescribe(dkStatement, stmtName) + conn.sendBuf.addBindRaw("", stmtName, paramFormats, data, ranges) + conn.sendBuf.addExecute("", 0) + conn.sendBuf.addSync() + await conn.sendBufMsg() + else: + conn.sendBuf.addParse("", sql, paramOids) + conn.sendBuf.addBindRaw("", "", paramFormats, data, ranges) + conn.sendBuf.addExecute("", 0) + conn.sendBuf.addSync() + await conn.sendBufMsg() + + var commandTag = "" + var queryError: ref PgQueryError + var cachedFields: seq[FieldDescription] + + block recvLoop: + while true: + while (let opt = conn.nextMessage(); opt.isSome): + let msg = opt.get + case msg.kind + of bmkParseComplete, bmkBindComplete, bmkCloseComplete: + discard + of bmkParameterDescription: + discard + of bmkRowDescription: + if cacheMiss: + cachedFields = msg.fields + of bmkNoData: + discard + of bmkDataRow: + discard + of bmkCommandComplete: + commandTag = msg.commandTag + of bmkEmptyQueryResponse: + discard + of bmkErrorResponse: + queryError = newPgQueryError(msg.errorFields) + of bmkReadyForQuery: + conn.txStatus = msg.txStatus + conn.state = csReady + if queryError != nil: + if cacheHit and queryError.sqlState == "26000": + conn.removeStmtCache(sql) + raise queryError + if cacheMiss: + conn.addStmtCache(sql, CachedStmt(name: stmtName, fields: cachedFields)) + break recvLoop + else: + discard + await conn.fillRecvBuf(timeout) + + return commandTag + +proc exec*( + conn: PgConnection, + sql: string, + params: seq[PgParamInline], + timeout: Duration = ZeroDuration, +): Future[CommandResult] {.async.} = + ## Execute a statement with heap-alloc-free inline parameters. + ## Prefer this overload for scalar-heavy workloads (e.g. bulk INSERT of + ## numeric columns) where `seq[PgParam]` would heap-allocate per parameter. + let (data, ranges, oids, formats) = flattenInline(params) + var tag: string + withConnTracing( + conn, + onQueryStart, + onQueryEnd, + TraceQueryStartData(sql: sql, paramsInline: params, isExec: true), + TraceQueryEndData, + TraceQueryEndData(commandTag: tag), + ): + if timeout > ZeroDuration: + try: + tag = await execInlineImpl(conn, sql, data, ranges, oids, formats, timeout).wait( + timeout + ) + except AsyncTimeoutError: + conn.cancelNoWait() + conn.state = csClosed + raise newException(PgTimeoutError, "Exec timed out") + else: + tag = await execInlineImpl(conn, sql, data, ranges, oids, formats) + return initCommandResult(tag) + template queryRecvLoop( conn: PgConnection, sql: string, @@ -841,6 +1025,108 @@ proc query*( qr = await queryImpl(conn, sql, params, resultFormats) return qr +proc queryInlineImpl( + conn: PgConnection, + sql: string, + data: seq[byte], + ranges: seq[tuple[off: int32, len: int32]], + paramOids: seq[int32], + paramFormats: seq[int16], + resultFormats: seq[int16] = @[], + timeout: Duration = ZeroDuration, +): Future[QueryResult] {.async.} = + conn.checkReady() + conn.state = csBusy + + let cached = conn.lookupStmtCache(sql) + var cacheHit = cached != nil + var cacheMiss = false + var stmtName = "" + var cachedFields: seq[FieldDescription] + var cachedColFmts: seq[int16] + var cachedColOids: seq[int32] + var effectiveResultFormats: seq[int16] + + conn.sendBuf.setLen(0) + if cacheHit: + stmtName = cached.name + cachedFields = cached.fields + cachedColFmts = cached.colFmts + cachedColOids = cached.colOids + effectiveResultFormats = + if resultFormats.len == 0: cached.resultFormats else: resultFormats + conn.sendBuf.addBindRaw( + "", stmtName, paramFormats, data, ranges, effectiveResultFormats + ) + conn.sendBuf.addExecute("", 0) + conn.sendBuf.addSync() + await conn.sendBufMsg() + elif conn.stmtCacheCapacity > 0: + cacheMiss = true + stmtName = conn.nextStmtName() + effectiveResultFormats = resultFormats + if conn.stmtCache.len >= conn.stmtCacheCapacity: + let evicted = conn.evictStmtCache() + conn.sendBuf.addClose(dkStatement, evicted.name) + conn.sendBuf.addParse(stmtName, sql, paramOids) + conn.sendBuf.addDescribe(dkStatement, stmtName) + conn.sendBuf.addBindRaw( + "", stmtName, paramFormats, data, ranges, effectiveResultFormats + ) + conn.sendBuf.addExecute("", 0) + conn.sendBuf.addSync() + await conn.sendBufMsg() + else: + effectiveResultFormats = resultFormats + conn.sendBuf.addParse("", sql, paramOids) + conn.sendBuf.addBindRaw("", "", paramFormats, data, ranges, effectiveResultFormats) + conn.sendBuf.addDescribe(dkPortal, "") + conn.sendBuf.addExecute("", 0) + conn.sendBuf.addSync() + await conn.sendBufMsg() + + var qr = QueryResult() + queryRecvLoop( + conn, sql, effectiveResultFormats, cacheHit, cacheMiss, stmtName, cachedFields, + cachedColFmts, cachedColOids, qr, timeout, + ) + return qr + +proc query*( + conn: PgConnection, + sql: string, + params: seq[PgParamInline], + resultFormat: ResultFormat = rfAuto, + timeout: Duration = ZeroDuration, +): Future[QueryResult] {.async.} = + ## Execute a query with heap-alloc-free inline parameters. + ## Prefer this overload for scalar-heavy workloads where `seq[PgParam]` + ## would heap-allocate per parameter. + let (data, ranges, oids, formats) = flattenInline(params) + var qr: QueryResult + withConnTracing( + conn, + onQueryStart, + onQueryEnd, + TraceQueryStartData(sql: sql, paramsInline: params, isExec: false), + TraceQueryEndData, + TraceQueryEndData(commandTag: qr.commandTag, rowCount: qr.rowCount), + ): + let resultFormats = resultFormat.toFormatCodes() + if timeout > ZeroDuration: + try: + qr = await queryInlineImpl( + conn, sql, data, ranges, oids, formats, resultFormats, timeout + ) + .wait(timeout) + except AsyncTimeoutError: + conn.cancelNoWait() + conn.state = csClosed + raise newException(PgTimeoutError, "Query timed out") + else: + qr = await queryInlineImpl(conn, sql, data, ranges, oids, formats, resultFormats) + return qr + proc queryOne*( conn: PgConnection, sql: string, @@ -2153,6 +2439,16 @@ proc newPipeline*(conn: PgConnection): Pipeline = ## Create a new pipeline for batching multiple operations into a single round trip. Pipeline(conn: conn, ops: @[]) +proc appendInline( + p: Pipeline, params: openArray[PgParamInline] +): tuple[start, count: int32] = + ## Append inline params to the Pipeline-level SoA buffers. Returns + ## `(start, count)` identifying the appended slice. + result.start = int32(p.inlineRanges.len) + result.count = int32(params.len) + for pi in params: + appendInlineParam(p.inlineData, p.inlineRanges, p.inlineOids, p.inlineFormats, pi) + proc addExec*(p: Pipeline, sql: string, params: seq[PgParam] = @[]) = ## Add an exec operation to the pipeline with typed parameters. var op = PipelineOp(kind: pokExec, sql: sql) @@ -2183,6 +2479,30 @@ proc addQuery*( resultFormats: resultFormat.toFormatCodes(), ) +proc addExec*(p: Pipeline, sql: string, params: openArray[PgParamInline]) = + ## Add an exec operation using the heap-alloc-free `PgParamInline` path. + let (start, count) = p.appendInline(params) + p.ops.add PipelineOp( + kind: pokExec, sql: sql, hasInline: true, inlineStart: start, inlineCount: count + ) + +proc addQuery*( + p: Pipeline, + sql: string, + params: openArray[PgParamInline], + resultFormat: ResultFormat = rfAuto, +) = + ## Add a query operation using the heap-alloc-free `PgParamInline` path. + let (start, count) = p.appendInline(params) + p.ops.add PipelineOp( + kind: pokQuery, + sql: sql, + hasInline: true, + inlineStart: start, + inlineCount: count, + resultFormats: resultFormat.toFormatCodes(), + ) + proc executeImpl( p: Pipeline, timeout: Duration = ZeroDuration ): Future[seq[PipelineResult]] {.async.} = @@ -2198,14 +2518,42 @@ proc executeImpl( var defaultFormats: seq[int16] # reused across ops when paramFormats is empty for i in 0 ..< p.ops.len: - let formats = - if p.ops[i].paramFormats.len > 0: - p.ops[i].paramFormats + let hasInline = p.ops[i].hasInline + let startIdx = int(p.ops[i].inlineStart) + let endIdx = startIdx + int(p.ops[i].inlineCount) - 1 + if not hasInline and p.ops[i].paramFormats.len == 0: + let needed = p.ops[i].params.len + if defaultFormats.len != needed: + defaultFormats = newSeq[int16](needed) + + template currentFormats(): openArray[int16] = + if hasInline: + p.inlineFormats.toOpenArray(startIdx, endIdx) + elif p.ops[i].paramFormats.len > 0: + p.ops[i].paramFormats.toOpenArray(0, p.ops[i].paramFormats.high) + else: + defaultFormats.toOpenArray(0, defaultFormats.high) + + template emitBind(stmt: string, resultFmts: openArray[int16]) = + if hasInline: + conn.sendBuf.addBindRaw( + "", + stmt, + currentFormats(), + p.inlineData, + p.inlineRanges.toOpenArray(startIdx, endIdx), + resultFmts, + ) else: - let needed = p.ops[i].params.len - if defaultFormats.len != needed: - defaultFormats = newSeq[int16](needed) - defaultFormats + conn.sendBuf.addBind("", stmt, currentFormats(), p.ops[i].params, resultFmts) + + template emitParse(stmt: string) = + if hasInline: + conn.sendBuf.addParse( + stmt, p.ops[i].sql, p.inlineOids.toOpenArray(startIdx, endIdx) + ) + else: + conn.sendBuf.addParse(stmt, p.ops[i].sql, p.ops[i].paramOids) let cached = conn.lookupStmtCache(p.ops[i].sql) p.ops[i].cacheHit = cached != nil @@ -2226,9 +2574,7 @@ proc executeImpl( else: p.ops[i].resultFormats p.ops[i].resultFormats = effectiveResultFormats - conn.sendBuf.addBind( - "", cached.name, formats, p.ops[i].params, effectiveResultFormats - ) + emitBind(cached.name, effectiveResultFormats) conn.sendBuf.addExecute("", 0) elif conn.stmtCacheCapacity > 0: p.ops[i].cacheMiss = true @@ -2238,21 +2584,27 @@ proc executeImpl( let evicted = conn.evictStmtCache() conn.sendBuf.addClose(dkStatement, evicted.name) inc pendingCacheAdds - conn.sendBuf.addParse(p.ops[i].stmtName, p.ops[i].sql, p.ops[i].paramOids) + emitParse(p.ops[i].stmtName) conn.sendBuf.addDescribe(dkStatement, p.ops[i].stmtName) - conn.sendBuf.addBind( - "", p.ops[i].stmtName, formats, p.ops[i].params, p.ops[i].resultFormats - ) + emitBind(p.ops[i].stmtName, p.ops[i].resultFormats) conn.sendBuf.addExecute("", 0) else: - conn.sendBuf.addParse("", p.ops[i].sql, p.ops[i].paramOids) - conn.sendBuf.addBind("", "", formats, p.ops[i].params, p.ops[i].resultFormats) + emitParse("") + emitBind("", p.ops[i].resultFormats) if p.ops[i].kind == pokQuery: conn.sendBuf.addDescribe(dkPortal, "") conn.sendBuf.addExecute("", 0) conn.sendBuf.addSync() - await conn.sendBufMsg() + when hasChronos: + # chronos drains the send Future in the background while we descend into + # the receive loop. The outer try/except below owns sendFut's lifetime: + # it drains sendFut on the normal path (propagating any stored write + # error) and cancels it on any abnormal exit so the Future never leaks + # and `conn.sendBuf` is never mutated while a write still borrows it. + var sendFut = conn.sendBufMsg() + else: + await conn.sendBufMsg() # Receive Phase var results = newSeq[PipelineResult](p.ops.len) @@ -2276,103 +2628,125 @@ proc executeImpl( else: results[i] = PipelineResult(kind: prkExec) - block recvLoop: - while true: - var rowData: RowData = nil - var rowCount: ptr int32 = nil - if activeOpIdx < p.ops.len and p.ops[activeOpIdx].kind == pokQuery: - rowData = results[activeOpIdx].queryResult.data - rowCount = addr results[activeOpIdx].queryResult.rowCount + try: + block recvLoop: + while true: + var rowData: RowData = nil + var rowCount: ptr int32 = nil + if activeOpIdx < p.ops.len and p.ops[activeOpIdx].kind == pokQuery: + rowData = results[activeOpIdx].queryResult.data + rowCount = addr results[activeOpIdx].queryResult.rowCount - while (let opt = conn.nextMessage(rowData, rowCount); opt.isSome): - let msg = opt.get - case msg.kind - of bmkParseComplete, bmkBindComplete, bmkCloseComplete: - discard - of bmkParameterDescription: - discard - of bmkRowDescription: - if activeOpIdx < p.ops.len and p.ops[activeOpIdx].kind == pokQuery: - var cf: seq[int16] - var co: seq[int32] - if p.ops[activeOpIdx].cacheMiss: - if cachedFieldsPerOp.len == 0: - cachedFieldsPerOp = newSeq[seq[FieldDescription]](p.ops.len) - cachedFieldsPerOp[activeOpIdx] = msg.fields - results[activeOpIdx].queryResult.fields = msg.fields - if p.ops[activeOpIdx].resultFormats.len > 0: - cf = newSeq[int16](msg.fields.len) - co = newSeq[int32](msg.fields.len) - for j in 0 ..< msg.fields.len: - co[j] = msg.fields[j].typeOid - if p.ops[activeOpIdx].resultFormats.len == 1: - results[activeOpIdx].queryResult.fields[j].formatCode = - p.ops[activeOpIdx].resultFormats[0] - cf[j] = p.ops[activeOpIdx].resultFormats[0] - elif j < p.ops[activeOpIdx].resultFormats.len: - results[activeOpIdx].queryResult.fields[j].formatCode = - p.ops[activeOpIdx].resultFormats[j] - cf[j] = p.ops[activeOpIdx].resultFormats[j] - else: - results[activeOpIdx].queryResult.fields = msg.fields - results[activeOpIdx].queryResult.data = - newRowData(int16(msg.fields.len), cf, co) - # Update pointers for nextMessage - rowData = results[activeOpIdx].queryResult.data - rowCount = addr results[activeOpIdx].queryResult.rowCount - of bmkNoData: - discard - of bmkCommandComplete: - if activeOpIdx < p.ops.len: - if p.ops[activeOpIdx].kind == pokExec: - results[activeOpIdx].commandResult = initCommandResult(msg.commandTag) - else: - results[activeOpIdx].queryResult.commandTag = msg.commandTag - inc activeOpIdx - # Update rowData/rowCount for next op - if activeOpIdx < p.ops.len and p.ops[activeOpIdx].kind == pokQuery: - rowData = results[activeOpIdx].queryResult.data - rowCount = addr results[activeOpIdx].queryResult.rowCount - else: - rowData = nil - rowCount = nil - of bmkEmptyQueryResponse: - if activeOpIdx < p.ops.len: - inc activeOpIdx + while (let opt = conn.nextMessage(rowData, rowCount); opt.isSome): + let msg = opt.get + case msg.kind + of bmkParseComplete, bmkBindComplete, bmkCloseComplete: + discard + of bmkParameterDescription: + discard + of bmkRowDescription: if activeOpIdx < p.ops.len and p.ops[activeOpIdx].kind == pokQuery: + var cf: seq[int16] + var co: seq[int32] + if p.ops[activeOpIdx].cacheMiss: + if cachedFieldsPerOp.len == 0: + cachedFieldsPerOp = newSeq[seq[FieldDescription]](p.ops.len) + cachedFieldsPerOp[activeOpIdx] = msg.fields + results[activeOpIdx].queryResult.fields = msg.fields + if p.ops[activeOpIdx].resultFormats.len > 0: + cf = newSeq[int16](msg.fields.len) + co = newSeq[int32](msg.fields.len) + for j in 0 ..< msg.fields.len: + co[j] = msg.fields[j].typeOid + if p.ops[activeOpIdx].resultFormats.len == 1: + results[activeOpIdx].queryResult.fields[j].formatCode = + p.ops[activeOpIdx].resultFormats[0] + cf[j] = p.ops[activeOpIdx].resultFormats[0] + elif j < p.ops[activeOpIdx].resultFormats.len: + results[activeOpIdx].queryResult.fields[j].formatCode = + p.ops[activeOpIdx].resultFormats[j] + cf[j] = p.ops[activeOpIdx].resultFormats[j] + else: + results[activeOpIdx].queryResult.fields = msg.fields + results[activeOpIdx].queryResult.data = + newRowData(int16(msg.fields.len), cf, co) + # Update pointers for nextMessage rowData = results[activeOpIdx].queryResult.data rowCount = addr results[activeOpIdx].queryResult.rowCount - else: - rowData = nil - rowCount = nil - of bmkErrorResponse: - if queryError == nil: - queryError = newPgQueryError(msg.errorFields) - of bmkReadyForQuery: - conn.txStatus = msg.txStatus - conn.state = csReady - if queryError != nil: - # Invalidate cache for 26000 (prepared statement does not exist) - if queryError.sqlState == "26000": - for i in 0 ..< p.ops.len: - if p.ops[i].cacheHit: - conn.removeStmtCache(p.ops[i].sql) - raise queryError - # Cache misses: add to cache - for i in 0 ..< p.ops.len: - if p.ops[i].cacheMiss: - let fields = - if cachedFieldsPerOp.len > 0: - cachedFieldsPerOp[i] - else: - @[] - conn.addStmtCache( - p.ops[i].sql, CachedStmt(name: p.ops[i].stmtName, fields: fields) - ) - break recvLoop - else: + of bmkNoData: + discard + of bmkCommandComplete: + if activeOpIdx < p.ops.len: + if p.ops[activeOpIdx].kind == pokExec: + results[activeOpIdx].commandResult = initCommandResult(msg.commandTag) + else: + results[activeOpIdx].queryResult.commandTag = msg.commandTag + inc activeOpIdx + # Update rowData/rowCount for next op + if activeOpIdx < p.ops.len and p.ops[activeOpIdx].kind == pokQuery: + rowData = results[activeOpIdx].queryResult.data + rowCount = addr results[activeOpIdx].queryResult.rowCount + else: + rowData = nil + rowCount = nil + of bmkEmptyQueryResponse: + if activeOpIdx < p.ops.len: + inc activeOpIdx + if activeOpIdx < p.ops.len and p.ops[activeOpIdx].kind == pokQuery: + rowData = results[activeOpIdx].queryResult.data + rowCount = addr results[activeOpIdx].queryResult.rowCount + else: + rowData = nil + rowCount = nil + of bmkErrorResponse: + if queryError == nil: + queryError = newPgQueryError(msg.errorFields) + of bmkReadyForQuery: + conn.txStatus = msg.txStatus + conn.state = csReady + if queryError != nil: + # Invalidate cache for 26000 (prepared statement does not exist) + if queryError.sqlState == "26000": + for i in 0 ..< p.ops.len: + if p.ops[i].cacheHit: + conn.removeStmtCache(p.ops[i].sql) + raise queryError + # Cache misses: add to cache + for i in 0 ..< p.ops.len: + if p.ops[i].cacheMiss: + let fields = + if cachedFieldsPerOp.len > 0: + cachedFieldsPerOp[i] + else: + @[] + conn.addStmtCache( + p.ops[i].sql, CachedStmt(name: p.ops[i].stmtName, fields: fields) + ) + break recvLoop + else: + discard + await conn.fillRecvBuf(timeout) + + when hasChronos: + # Normal path: drain sendFut to propagate any stored write error and + # release the borrow on conn.sendBuf. + await sendFut + except CatchableError as e: + when hasChronos: + # Abnormal path (recv error, cancellation, failed stored write): + # ensure sendFut never escapes as an unhandled Future and that no + # in-flight write still holds unsafeAddr conn.sendBuf[0]. + if not sendFut.finished: + try: + await cancelAndWait(sendFut) + except CatchableError: discard - await conn.fillRecvBuf(timeout) + else: + try: + await sendFut + except CatchableError: + discard + raise e return results @@ -2417,17 +2791,45 @@ proc executeIsolatedImpl( var cachedStmts: seq[CachedStmt] var hasCachedStmts = false var pendingCacheAdds = 0 - var defaultFormats: seq[int16] + var defaultFormats: seq[int16] for i in 0 ..< p.ops.len: - let formats = - if p.ops[i].paramFormats.len > 0: - p.ops[i].paramFormats + let hasInline = p.ops[i].hasInline + let startIdx = int(p.ops[i].inlineStart) + let endIdx = startIdx + int(p.ops[i].inlineCount) - 1 + if not hasInline and p.ops[i].paramFormats.len == 0: + let needed = p.ops[i].params.len + if defaultFormats.len != needed: + defaultFormats = newSeq[int16](needed) + + template currentFormats(): openArray[int16] = + if hasInline: + p.inlineFormats.toOpenArray(startIdx, endIdx) + elif p.ops[i].paramFormats.len > 0: + p.ops[i].paramFormats.toOpenArray(0, p.ops[i].paramFormats.high) + else: + defaultFormats.toOpenArray(0, defaultFormats.high) + + template emitBind(stmt: string, resultFmts: openArray[int16]) = + if hasInline: + conn.sendBuf.addBindRaw( + "", + stmt, + currentFormats(), + p.inlineData, + p.inlineRanges.toOpenArray(startIdx, endIdx), + resultFmts, + ) + else: + conn.sendBuf.addBind("", stmt, currentFormats(), p.ops[i].params, resultFmts) + + template emitParse(stmt: string) = + if hasInline: + conn.sendBuf.addParse( + stmt, p.ops[i].sql, p.inlineOids.toOpenArray(startIdx, endIdx) + ) else: - let needed = p.ops[i].params.len - if defaultFormats.len != needed: - defaultFormats = newSeq[int16](needed) - defaultFormats + conn.sendBuf.addParse(stmt, p.ops[i].sql, p.ops[i].paramOids) let cached = conn.lookupStmtCache(p.ops[i].sql) p.ops[i].cacheHit = cached != nil @@ -2448,9 +2850,7 @@ proc executeIsolatedImpl( else: p.ops[i].resultFormats p.ops[i].resultFormats = effectiveResultFormats - conn.sendBuf.addBind( - "", cached.name, formats, p.ops[i].params, effectiveResultFormats - ) + emitBind(cached.name, effectiveResultFormats) conn.sendBuf.addExecute("", 0) elif conn.stmtCacheCapacity > 0: p.ops[i].cacheMiss = true @@ -2460,22 +2860,26 @@ proc executeIsolatedImpl( let evicted = conn.evictStmtCache() conn.sendBuf.addClose(dkStatement, evicted.name) inc pendingCacheAdds - conn.sendBuf.addParse(p.ops[i].stmtName, p.ops[i].sql, p.ops[i].paramOids) + emitParse(p.ops[i].stmtName) conn.sendBuf.addDescribe(dkStatement, p.ops[i].stmtName) - conn.sendBuf.addBind( - "", p.ops[i].stmtName, formats, p.ops[i].params, p.ops[i].resultFormats - ) + emitBind(p.ops[i].stmtName, p.ops[i].resultFormats) conn.sendBuf.addExecute("", 0) else: - conn.sendBuf.addParse("", p.ops[i].sql, p.ops[i].paramOids) - conn.sendBuf.addBind("", "", formats, p.ops[i].params, p.ops[i].resultFormats) + emitParse("") + emitBind("", p.ops[i].resultFormats) if p.ops[i].kind == pokQuery: conn.sendBuf.addDescribe(dkPortal, "") conn.sendBuf.addExecute("", 0) conn.sendBuf.addSync() # Per-op SYNC for error isolation - await conn.sendBufMsg() + when hasChronos: + # Same concurrent-send pattern as executeImpl: the write drains while the + # recv loop consumes per-op ReadyForQuery messages. Per-op SYNC still + # provides error isolation; only the IO scheduling differs. + var sendFut = conn.sendBufMsg() + else: + await conn.sendBufMsg() # Receive Phase (per-op ReadyForQuery) var results = newSeq[PipelineResult](p.ops.len) @@ -2497,81 +2901,101 @@ proc executeIsolatedImpl( else: results[i] = PipelineResult(kind: prkExec) - for opIdx in 0 ..< p.ops.len: - var opError: ref PgQueryError - var cachedFields: seq[FieldDescription] - - block opRecv: - while true: - var rowData: RowData = nil - var rowCount: ptr int32 = nil - if p.ops[opIdx].kind == pokQuery: - rowData = results[opIdx].queryResult.data - rowCount = addr results[opIdx].queryResult.rowCount - - while (let opt = conn.nextMessage(rowData, rowCount); opt.isSome): - let msg = opt.get - case msg.kind - of bmkParseComplete, bmkBindComplete, bmkCloseComplete: - discard - of bmkParameterDescription: - discard - of bmkRowDescription: - if p.ops[opIdx].kind == pokQuery: - if p.ops[opIdx].cacheMiss: - cachedFields = msg.fields - results[opIdx].queryResult.fields = msg.fields - var cf: seq[int16] - var co: seq[int32] - if p.ops[opIdx].resultFormats.len > 0: - cf = newSeq[int16](msg.fields.len) - co = newSeq[int32](msg.fields.len) - for j in 0 ..< msg.fields.len: - co[j] = msg.fields[j].typeOid - if p.ops[opIdx].resultFormats.len == 1: - results[opIdx].queryResult.fields[j].formatCode = - p.ops[opIdx].resultFormats[0] - cf[j] = p.ops[opIdx].resultFormats[0] - elif j < p.ops[opIdx].resultFormats.len: - results[opIdx].queryResult.fields[j].formatCode = - p.ops[opIdx].resultFormats[j] - cf[j] = p.ops[opIdx].resultFormats[j] - results[opIdx].queryResult.data = - newRowData(int16(msg.fields.len), cf, co) - rowData = results[opIdx].queryResult.data - rowCount = addr results[opIdx].queryResult.rowCount + try: + for opIdx in 0 ..< p.ops.len: + var opError: ref PgQueryError + var cachedFields: seq[FieldDescription] + + block opRecv: + while true: + var rowData: RowData = nil + var rowCount: ptr int32 = nil + if p.ops[opIdx].kind == pokQuery: + rowData = results[opIdx].queryResult.data + rowCount = addr results[opIdx].queryResult.rowCount + + while (let opt = conn.nextMessage(rowData, rowCount); opt.isSome): + let msg = opt.get + case msg.kind + of bmkParseComplete, bmkBindComplete, bmkCloseComplete: + discard + of bmkParameterDescription: + discard + of bmkRowDescription: + if p.ops[opIdx].kind == pokQuery: + if p.ops[opIdx].cacheMiss: + cachedFields = msg.fields + results[opIdx].queryResult.fields = msg.fields + var cf: seq[int16] + var co: seq[int32] + if p.ops[opIdx].resultFormats.len > 0: + cf = newSeq[int16](msg.fields.len) + co = newSeq[int32](msg.fields.len) + for j in 0 ..< msg.fields.len: + co[j] = msg.fields[j].typeOid + if p.ops[opIdx].resultFormats.len == 1: + results[opIdx].queryResult.fields[j].formatCode = + p.ops[opIdx].resultFormats[0] + cf[j] = p.ops[opIdx].resultFormats[0] + elif j < p.ops[opIdx].resultFormats.len: + results[opIdx].queryResult.fields[j].formatCode = + p.ops[opIdx].resultFormats[j] + cf[j] = p.ops[opIdx].resultFormats[j] + results[opIdx].queryResult.data = + newRowData(int16(msg.fields.len), cf, co) + rowData = results[opIdx].queryResult.data + rowCount = addr results[opIdx].queryResult.rowCount + else: + results[opIdx].queryResult.fields = msg.fields + results[opIdx].queryResult.data = newRowData(int16(msg.fields.len)) + rowData = results[opIdx].queryResult.data + rowCount = addr results[opIdx].queryResult.rowCount + of bmkNoData: + discard + of bmkCommandComplete: + if p.ops[opIdx].kind == pokExec: + results[opIdx].commandResult = initCommandResult(msg.commandTag) else: - results[opIdx].queryResult.fields = msg.fields - results[opIdx].queryResult.data = newRowData(int16(msg.fields.len)) - rowData = results[opIdx].queryResult.data - rowCount = addr results[opIdx].queryResult.rowCount - of bmkNoData: - discard - of bmkCommandComplete: - if p.ops[opIdx].kind == pokExec: - results[opIdx].commandResult = initCommandResult(msg.commandTag) + results[opIdx].queryResult.commandTag = msg.commandTag + of bmkEmptyQueryResponse: + discard + of bmkErrorResponse: + if opError == nil: + opError = newPgQueryError(msg.errorFields) + of bmkReadyForQuery: + conn.txStatus = msg.txStatus + if opError != nil: + if opError.sqlState == "26000" and p.ops[opIdx].cacheHit: + conn.removeStmtCache(p.ops[opIdx].sql) + errors[opIdx] = opError + elif p.ops[opIdx].cacheMiss: + conn.addStmtCache( + p.ops[opIdx].sql, + CachedStmt(name: p.ops[opIdx].stmtName, fields: cachedFields), + ) + break opRecv else: - results[opIdx].queryResult.commandTag = msg.commandTag - of bmkEmptyQueryResponse: - discard - of bmkErrorResponse: - if opError == nil: - opError = newPgQueryError(msg.errorFields) - of bmkReadyForQuery: - conn.txStatus = msg.txStatus - if opError != nil: - if opError.sqlState == "26000" and p.ops[opIdx].cacheHit: - conn.removeStmtCache(p.ops[opIdx].sql) - errors[opIdx] = opError - elif p.ops[opIdx].cacheMiss: - conn.addStmtCache( - p.ops[opIdx].sql, - CachedStmt(name: p.ops[opIdx].stmtName, fields: cachedFields), - ) - break opRecv - else: - discard - await conn.fillRecvBuf(timeout) + discard + await conn.fillRecvBuf(timeout) + + when hasChronos: + # Normal path: drain sendFut to propagate any stored write error. + await sendFut + except CatchableError as e: + when hasChronos: + # Abnormal path: cancel or drain sendFut so the Future never leaks + # and no in-flight write still borrows conn.sendBuf. + if not sendFut.finished: + try: + await cancelAndWait(sendFut) + except CatchableError: + discard + else: + try: + await sendFut + except CatchableError: + discard + raise e conn.state = csReady return IsolatedPipelineResults(results: results, errors: errors) diff --git a/async_postgres/pg_connection.nim b/async_postgres/pg_connection.nim index 1e5617a..36c74b9 100644 --- a/async_postgres/pg_connection.nim +++ b/async_postgres/pg_connection.nim @@ -215,6 +215,13 @@ type TraceQueryStartData* = object ## Data passed to the query/exec start hook. sql*: string params*: seq[PgParam] + ## Populated when the caller used a `seq[PgParam]` overload. Mutually + ## exclusive with `paramsInline`: exactly one of the two is non-empty + ## per call (or both are empty if the query has no bound parameters). + paramsInline*: seq[PgParamInline] + ## Populated when the caller used a `PgParamInline` overload. Mutually + ## exclusive with `params` (see above). Tracers that want a single view + ## should branch on whichever field is non-empty. isExec*: bool ## true for exec, false for query TraceQueryEndData* = object ## Data passed to the query/exec end hook. diff --git a/async_postgres/pg_protocol.nim b/async_postgres/pg_protocol.nim index 64aea74..9d4ed22 100644 --- a/async_postgres/pg_protocol.nim +++ b/async_postgres/pg_protocol.nim @@ -532,6 +532,58 @@ proc addBind*( buf.addInt16(f) buf.patchMsgLen(msgStart) +proc addBindRaw*( + buf: var seq[byte], + portalName: string, + stmtName: string, + paramFormats: openArray[int16], + paramData: openArray[byte], + paramRanges: openArray[tuple[off: int32, len: int32]], + resultFormats: openArray[int16] = [], +) = + ## Append a Bind message built from a raw byte buffer and offset/length + ## ranges. Each parameter is described by `(off, len)`: `len == -1` encodes + ## NULL; any other `len` reads `paramData[off ..< off + len]`. Lets callers + ## write payloads straight into a single owned buffer without constructing + ## `Option[seq[byte]]` per parameter. + ## + ## Each range must satisfy one of: `len == -1` (NULL), or + ## `len >= 0` with `0 <= off` and `off + len <= paramData.len`. + ## Invalid ranges raise `ValueError` — the check is always active so callers + ## cannot silently trigger an out-of-bounds `copyMem` in release builds. + let msgStart = buf.len + buf.add(byte('B')) + buf.addInt32(0) # length placeholder + buf.addCString(portalName) + buf.addCString(stmtName) + buf.addInt16(int16(paramFormats.len)) + for f in paramFormats: + buf.addInt16(f) + buf.addInt16(int16(paramRanges.len)) + for r in paramRanges: + if r.len < -1: + raise newException(ValueError, "addBindRaw: invalid range len " & $r.len) + if r.len == -1: + buf.addInt32(-1) + else: + buf.addInt32(r.len) + if r.len > 0: + if r.off < 0: + raise newException(ValueError, "addBindRaw: negative range off " & $r.off) + if r.off.int64 + r.len.int64 > paramData.len.int64: + raise newException( + ValueError, + "addBindRaw: range out of bounds (off=" & $r.off & ", len=" & $r.len & + ", data.len=" & $paramData.len & ")", + ) + let oldLen = buf.len + buf.setLen(oldLen + r.len) + copyMem(addr buf[oldLen], unsafeAddr paramData[r.off], r.len) + buf.addInt16(int16(resultFormats.len)) + for f in resultFormats: + buf.addInt16(f) + buf.patchMsgLen(msgStart) + proc addDescribe*(buf: var seq[byte], kind: DescribeKind, name: string) = ## Append a Describe message to the buffer (portal or statement). let msgStart = buf.len diff --git a/async_postgres/pg_types.nim b/async_postgres/pg_types.nim index 5926231..4848b42 100644 --- a/async_postgres/pg_types.nim +++ b/async_postgres/pg_types.nim @@ -134,6 +134,19 @@ type format*: int16 # 0=text, 1=binary value*: Option[seq[byte]] + PgParamInline* = object + ## Heap-alloc-free parameter for scalar types. Binary payloads up to + ## `PgInlineBufSize` bytes live in `inlineBuf`; longer values spill into + ## `overflow`. Use `toPgParamInline` to construct; pass to the `openArray + ## [PgParamInline]` overloads of `exec`, `query`, `addExec`, `addQuery`. + oid*: int32 + format*: int16 # 0=text, 1=binary + len*: int32 + ## -1 = NULL; 0..PgInlineBufSize uses `inlineBuf`; + ## > PgInlineBufSize uses `overflow`. + inlineBuf*: array[16, byte] + overflow*: seq[byte] + ResultFormat* = enum ## How result columns should be encoded by the server. rfAuto ## Per-column binary-safe detection via statement cache (default). @@ -290,6 +303,10 @@ const rangeLowerInc* = 0x08'u8 ## Range flag: lower bound is inclusive. rangeUpperInc* = 0x10'u8 ## Range flag: upper bound is inclusive. + PgInlineBufSize* = 16 + ## Maximum payload size that fits in `PgParamInline.inlineBuf` without a + ## heap allocation. Values longer than this are stored in `overflow`. + proc `$`*(v: PgUuid): string {.borrow.} proc `==`*(a, b: PgUuid): bool {.borrow.} proc hash*(v: PgUuid): Hash {.borrow.} @@ -993,6 +1010,112 @@ proc decodeFloat64BE*(data: openArray[byte], offset: int = 0): float64 = (uint64(data[offset + 6]) shl 8) or uint64(data[offset + 7]) copyMem(addr result, addr bits, 8) +proc toPgParamInline*(v: int16): PgParamInline = + result.oid = OidInt2 + result.format = 1 + result.len = 2 + let be = toBE16(v) + result.inlineBuf[0] = be[0] + result.inlineBuf[1] = be[1] + +proc toPgParamInline*(v: int32): PgParamInline = + result.oid = OidInt4 + result.format = 1 + result.len = 4 + let be = toBE32(v) + result.inlineBuf[0] = be[0] + result.inlineBuf[1] = be[1] + result.inlineBuf[2] = be[2] + result.inlineBuf[3] = be[3] + +proc toPgParamInline*(v: int64): PgParamInline = + result.oid = OidInt8 + result.format = 1 + result.len = 8 + let be = toBE64(v) + for i in 0 ..< 8: + result.inlineBuf[i] = be[i] + +proc toPgParamInline*(v: int): PgParamInline = + toPgParamInline(int64(v)) + +proc toPgParamInline*(v: float32): PgParamInline = + result.oid = OidFloat4 + result.format = 1 + result.len = 4 + let be = toBE32(cast[int32](v)) + result.inlineBuf[0] = be[0] + result.inlineBuf[1] = be[1] + result.inlineBuf[2] = be[2] + result.inlineBuf[3] = be[3] + +proc toPgParamInline*(v: float64): PgParamInline = + result.oid = OidFloat8 + result.format = 1 + result.len = 8 + let be = toBE64(cast[int64](v)) + for i in 0 ..< 8: + result.inlineBuf[i] = be[i] + +proc toPgParamInline*(v: bool): PgParamInline = + result.oid = OidBool + result.format = 1 + result.len = 1 + result.inlineBuf[0] = if v: 1'u8 else: 0'u8 + +proc toPgParamInline*(v: string): PgParamInline = + result.oid = OidText + result.format = 0 + result.len = int32(v.len) + if v.len == 0: + discard + elif v.len <= PgInlineBufSize: + copyMem(addr result.inlineBuf[0], unsafeAddr v[0], v.len) + else: + result.overflow = newSeq[byte](v.len) + copyMem(addr result.overflow[0], unsafeAddr v[0], v.len) + +proc toPgParamInline*(v: seq[byte]): PgParamInline = + result.oid = OidBytea + result.format = 0 + result.len = int32(v.len) + if v.len == 0: + discard + elif v.len <= PgInlineBufSize: + copyMem(addr result.inlineBuf[0], unsafeAddr v[0], v.len) + else: + result.overflow = v + +proc toPgParamInline*(v: PgUuid): PgParamInline = + # Text format with OidUuid (matches toPgParam). UUID canonical string is + # 36 bytes, so the payload always takes the overflow path. + let s = string(v) + result.oid = OidUuid + result.format = 0 + result.len = int32(s.len) + if s.len == 0: + discard + elif s.len <= PgInlineBufSize: + copyMem(addr result.inlineBuf[0], unsafeAddr s[0], s.len) + else: + result.overflow = newSeq[byte](s.len) + copyMem(addr result.overflow[0], unsafeAddr s[0], s.len) + +proc toPgParamInline*(v: PgMoney): PgParamInline = + result.oid = OidMoney + result.format = 1 + result.len = 8 + let be = toBE64(v.amount) + for i in 0 ..< 8: + result.inlineBuf[i] = be[i] + +proc toPgParamInline*[T](v: Option[T]): PgParamInline = + if v.isSome: + toPgParamInline(v.get) + else: + let tmpl = toPgParamInline(default(T)) + PgParamInline(oid: tmpl.oid, format: tmpl.format, len: -1) + proc toPgParam*(v: string): PgParam = ## Convert a Nim value to a PgParam for use as a query parameter. ## Uses text format for strings, binary for numeric types. @@ -2199,13 +2322,35 @@ converter toRow*(cells: seq[Option[seq[byte]]]): Row = rd.buf.add(data) initRow(rd, 0) -proc parseAffectedRows*(tag: string): int64 = - ## Extract row count from command tag (e.g. "UPDATE 3" -> 3, "INSERT 0 1" -> 1). - let parts = tag.split(' ') +proc parseAffectedRowsRaw*(tag: openArray[char]): int64 = + ## Extract row count from the raw bytes of a command tag (e.g. + ## "UPDATE 3" -> 3, "INSERT 0 1" -> 1). Unlike `parseAffectedRows(string)` + ## this performs zero heap allocation — useful for pipelines that process + ## many `CommandComplete` messages. + ## + ## Mirrors the legacy `split(' ')` semantics exactly: the last token (bytes + ## after the final space) must parse as an integer; a trailing space or any + ## non-numeric tail yields 0. + if tag.len == 0: + return 0 + var lo = tag.high + while lo >= 0 and tag[lo] != ' ': + dec lo + inc lo + if lo > tag.high: + return 0 + var parsed: BiggestInt = 0 try: - parseBiggestInt(parts[^1]) + let consumed = parseutils.parseBiggestInt(tag.toOpenArray(lo, tag.high), parsed) + if consumed == 0 or consumed != tag.high - lo + 1: + return 0 except ValueError, OverflowDefect: - 0 + return 0 + parsed + +proc parseAffectedRows*(tag: string): int64 = + ## Extract row count from command tag (e.g. "UPDATE 3" -> 3, "INSERT 0 1" -> 1). + parseAffectedRowsRaw(tag.toOpenArray(0, tag.high)) proc initCommandResult*(tag: string): CommandResult {.inline.} = CommandResult(commandTag: tag) diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index c6c3fb8..1cf79a7 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -5627,6 +5627,300 @@ suite "E2E: execInTransaction / queryInTransaction": waitFor t() + test "pipeline: PgParamInline overload roundtrip": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_inline_pipe") + discard + await conn.exec("CREATE TABLE test_inline_pipe (id serial PRIMARY KEY, v int)") + + let p = newPipeline(conn) + for i in 0 ..< 5: + p.addExec( + "INSERT INTO test_inline_pipe (v) VALUES ($1)", [i.int32.toPgParamInline] + ) + p.addQuery("SELECT v FROM test_inline_pipe ORDER BY id") + let results = await p.execute() + doAssert results.len == 6 + for i in 0 ..< 5: + doAssert results[i].commandResult == "INSERT 0 1" + let qr = results[5].queryResult + doAssert qr.rows.len == 5 + for i in 0 ..< 5: + doAssert qr.rows[i].getStr(0) == $i + + discard await conn.exec("DROP TABLE test_inline_pipe") + await conn.close() + + waitFor t() + + test "pipeline: PgParamInline and PgParam overloads can be mixed in one pipeline": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_mixed_pipe") + discard await conn.exec( + "CREATE TABLE test_mixed_pipe (id serial PRIMARY KEY, v int, s text)" + ) + + let p = newPipeline(conn) + # inline path + p.addExec( + "INSERT INTO test_mixed_pipe (v, s) VALUES ($1, $2)", + [1.int32.toPgParamInline, "from-inline".toPgParamInline], + ) + # legacy path + p.addExec( + "INSERT INTO test_mixed_pipe (v, s) VALUES ($1, $2)", + @[toPgParam(2'i32), toPgParam("from-legacy")], + ) + p.addQuery("SELECT v, s FROM test_mixed_pipe ORDER BY id") + let results = await p.execute() + doAssert results.len == 3 + let rows = results[2].queryResult.rows + doAssert rows.len == 2 + doAssert rows[0].getStr(0) == "1" + doAssert rows[0].getStr(1) == "from-inline" + doAssert rows[1].getStr(0) == "2" + doAssert rows[1].getStr(1) == "from-legacy" + + discard await conn.exec("DROP TABLE test_mixed_pipe") + await conn.close() + + waitFor t() + + test "pipeline: PgParamInline NULL via Option": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_inline_null") + discard + await conn.exec("CREATE TABLE test_inline_null (id serial PRIMARY KEY, v int)") + + let p = newPipeline(conn) + p.addExec( + "INSERT INTO test_inline_null (v) VALUES ($1)", [none(int32).toPgParamInline] + ) + p.addQuery("SELECT v FROM test_inline_null ORDER BY id") + let results = await p.execute() + doAssert results[1].queryResult.rows[0].isNull(0) + + discard await conn.exec("DROP TABLE test_inline_null") + await conn.close() + + waitFor t() + + test "pipeline: PgParamInline overflow path (long string)": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_inline_overflow") + discard await conn.exec( + "CREATE TABLE test_inline_overflow (id serial PRIMARY KEY, s text)" + ) + + let long = "abcdefghijklmnopqrstuvwxyz0123456789" # 36 chars → overflow + let p = newPipeline(conn) + p.addExec( + "INSERT INTO test_inline_overflow (s) VALUES ($1)", [long.toPgParamInline] + ) + p.addQuery("SELECT s FROM test_inline_overflow") + let results = await p.execute() + doAssert results[1].queryResult.rows[0].getStr(0) == long + + discard await conn.exec("DROP TABLE test_inline_overflow") + await conn.close() + + waitFor t() + + test "pipeline: PgParamInline large overflow (16KB single value)": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_inline_large") + discard await conn.exec( + "CREATE TABLE test_inline_large (id serial PRIMARY KEY, s text)" + ) + + # 16 KB payload — well beyond PgInlineBufSize, exercises a single large + # copy into the SoA inlineData buffer. + var big = newStringOfCap(16 * 1024) + for i in 0 ..< 16 * 1024: + big.add char(ord('a') + (i mod 26)) + let p = newPipeline(conn) + p.addExec("INSERT INTO test_inline_large (s) VALUES ($1)", [big.toPgParamInline]) + p.addQuery("SELECT s FROM test_inline_large") + let results = await p.execute() + doAssert results[0].commandResult == "INSERT 0 1" + doAssert results[1].queryResult.rows[0].getStr(0) == big + + discard await conn.exec("DROP TABLE test_inline_large") + await conn.close() + + waitFor t() + + test "pipeline: PgParamInline many large overflows stress SoA reallocation": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_inline_stress") + discard await conn.exec( + "CREATE TABLE test_inline_stress (id serial PRIMARY KEY, s text)" + ) + + # 50 × ~2 KB values across 50 ops in one pipeline. Each value is unique + # so a bug in SoA offset accounting (e.g. slice aliasing, reuse of stale + # offsets after inlineData grows) surfaces as a mismatched readback. + let numOps = 50 + let valueSize = 2 * 1024 + var expected = newSeq[string](numOps) + let p = newPipeline(conn) + for i in 0 ..< numOps: + var s = newStringOfCap(valueSize) + # Prefix with the index so every value is distinct and order-sensitive. + s.add "op" & $i & ":" + while s.len < valueSize: + s.add char(ord('a') + ((i + s.len) mod 26)) + expected[i] = s + p.addExec("INSERT INTO test_inline_stress (s) VALUES ($1)", [s.toPgParamInline]) + p.addQuery("SELECT s FROM test_inline_stress ORDER BY id") + let results = await p.execute() + doAssert results.len == numOps + 1 + for i in 0 ..< numOps: + doAssert results[i].commandResult == "INSERT 0 1" + let rows = results[numOps].queryResult.rows + doAssert rows.len == numOps + for i in 0 ..< numOps: + doAssert rows[i].getStr(0) == expected[i] + + discard await conn.exec("DROP TABLE test_inline_stress") + await conn.close() + + waitFor t() + + test "pipeline: PgParamInline empty params on param-less SQL": + # Edge case: caller reaches the inline overload with zero params (e.g. a + # SQL that takes no bound parameters). executeImpl builds an empty + # openArray via `toOpenArray(start, start-1)`; regression test that the + # Bind message still goes out cleanly and the query round-trips. + proc t() {.async.} = + let conn = await connect(plainConfig()) + let p = newPipeline(conn) + let empty: seq[PgParamInline] = @[] + p.addQuery("SELECT 42", empty) + p.addExec("SELECT 1", empty) + let results = await p.execute() + doAssert results.len == 2 + doAssert results[0].queryResult.rows[0].getStr(0) == "42" + doAssert results[1].commandResult == "SELECT 1" + await conn.close() + + waitFor t() + + test "exec: PgParamInline overload roundtrip": + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_exec_inline") + discard + await conn.exec("CREATE TABLE test_exec_inline (id serial PRIMARY KEY, v int)") + + # Multi-value INSERT (the benchmark workload): 100 params in one exec. + var sql = "INSERT INTO test_exec_inline (v) VALUES " + var params = newSeqOfCap[PgParamInline](100) + for j in 0 ..< 100: + if j > 0: + sql.add "," + sql.add "($" & $(j + 1) & ")" + params.add j.int32.toPgParamInline + let cr = await conn.exec(sql, params) + doAssert cr.affectedRows == 100 + + let qr = + await conn.query("SELECT count(*)::int4, max(v)::int4 FROM test_exec_inline") + doAssert qr.rows[0].getStr(0) == "100" + doAssert qr.rows[0].getStr(1) == "99" + + discard await conn.exec("DROP TABLE test_exec_inline") + await conn.close() + + waitFor t() + + test "pipeline: executeIsolated with PgParamInline roundtrip": + # Covers the inline path through executeIsolatedImpl (per-op SYNC) and + # its concurrent send/recv scheduling under chronos. Mixes inline and + # legacy params across ops so both code paths in the op loop execute. + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_iso_inline") + discard await conn.exec( + "CREATE TABLE test_iso_inline (id serial PRIMARY KEY, v int, s text)" + ) + + let p = newPipeline(conn) + p.addExec( + "INSERT INTO test_iso_inline (v, s) VALUES ($1, $2)", + [10.int32.toPgParamInline, "inline".toPgParamInline], + ) + p.addExec( + "INSERT INTO test_iso_inline (v, s) VALUES ($1, $2)", + @[toPgParam(20'i32), toPgParam("legacy")], + ) + p.addQuery( + "SELECT v, s FROM test_iso_inline WHERE v > $1 ORDER BY id", + [5.int32.toPgParamInline], + ) + let ir = await p.executeIsolated() + doAssert ir.results.len == 3 + for e in ir.errors: + doAssert e == nil + doAssert ir.results[0].commandResult == "INSERT 0 1" + doAssert ir.results[1].commandResult == "INSERT 0 1" + let rows = ir.results[2].queryResult.rows + doAssert rows.len == 2 + doAssert rows[0].getStr(0) == "10" + doAssert rows[0].getStr(1) == "inline" + doAssert rows[1].getStr(0) == "20" + doAssert rows[1].getStr(1) == "legacy" + + discard await conn.exec("DROP TABLE test_iso_inline") + await conn.close() + + waitFor t() + + test "pipeline: executeIsolated with PgParamInline isolates per-op errors": + # Confirms that per-op SYNC still provides error isolation when ops use + # the inline overload: a failing inline op must not abort subsequent + # inline ops, and the connection must remain usable afterwards. + proc t() {.async.} = + let conn = await connect(plainConfig()) + discard await conn.exec("DROP TABLE IF EXISTS test_iso_inline_err") + discard await conn.exec( + "CREATE TABLE test_iso_inline_err (id serial PRIMARY KEY, v int NOT NULL)" + ) + + let p = newPipeline(conn) + p.addExec( + "INSERT INTO test_iso_inline_err (v) VALUES ($1)", [1.int32.toPgParamInline] + ) + # NULL inline param violates NOT NULL — isolated error. + p.addExec( + "INSERT INTO test_iso_inline_err (v) VALUES ($1)", [none(int32).toPgParamInline] + ) + p.addExec( + "INSERT INTO test_iso_inline_err (v) VALUES ($1)", [3.int32.toPgParamInline] + ) + let ir = await p.executeIsolated() + doAssert ir.results.len == 3 + doAssert ir.errors[0] == nil + doAssert ir.errors[1] != nil + doAssert ir.errors[2] == nil # not aborted + + doAssert conn.state == csReady + let qr = await conn.query("SELECT v FROM test_iso_inline_err ORDER BY id") + doAssert qr.rowCount == 2 + doAssert qr.rows[0].getStr(0) == "1" + doAssert qr.rows[1].getStr(0) == "3" + + discard await conn.exec("DROP TABLE test_iso_inline_err") + await conn.close() + + waitFor t() + test "pipeline: empty pipeline": proc t() {.async.} = let conn = await connect(plainConfig()) diff --git a/tests/test_types.nim b/tests/test_types.nim index 900e689..60b103f 100644 --- a/tests/test_types.nim +++ b/tests/test_types.nim @@ -1,7 +1,11 @@ -import std/[json, unittest, options, strutils, tables, times, math, net, typetraits] +import + std/[ + json, unittest, options, strutils, tables, times, math, net, typetraits, importutils + ] import ../async_postgres/pg_protocol import ../async_postgres/pg_types {.all.} +import ../async_postgres/pg_client {.all.} type UsPostalCode = distinct string @@ -5850,3 +5854,490 @@ suite "Multirange array types": let row: Row = @[p.value] let arr = row.getNumMultirangeArray(0) check arr.len == 1 + +proc inlinePayload(p: PgParamInline): seq[byte] = + ## Reconstruct the binary payload a PgParamInline would emit on the wire, + ## picking between `inlineBuf` and `overflow` based on `len`. + if p.len == -1: + return @[] + if p.len == 0: + return @[] + if p.len <= PgInlineBufSize: + result = newSeq[byte](p.len) + copyMem(addr result[0], unsafeAddr p.inlineBuf[0], p.len) + else: + result = p.overflow + +suite "toPgParamInline": + test "int16 encodes as binary BE and fits inline": + let p = toPgParamInline(42'i16) + check p.oid == OidInt2 + check p.format == 1 + check p.len == 2 + check inlinePayload(p) == @(toBE16(42'i16)) + + test "int16 negative": + let p = toPgParamInline(-1'i16) + check inlinePayload(p) == @(toBE16(-1'i16)) + + test "int32 wire-format matches toPgParam": + let old = toPgParam(123456'i32) + let new = toPgParamInline(123456'i32) + check new.oid == old.oid + check new.format == old.format + check inlinePayload(new) == old.value.get + + test "int32 boundaries": + for v in [0'i32, 1, -1, int32.high, int32.low]: + let p = toPgParamInline(v) + check p.len == 4 + check inlinePayload(p) == @(toBE32(v)) + + test "int64 fits inline at 8 bytes": + let p = toPgParamInline(9_999_999_999'i64) + check p.oid == OidInt8 + check p.len == 8 + check inlinePayload(p) == @(toBE64(9_999_999_999'i64)) + + test "int widens to int64": + let p = toPgParamInline(42.int) + check p.oid == OidInt8 + check p.len == 8 + check inlinePayload(p) == @(toBE64(42'i64)) + + test "float32 wire-format": + let p = toPgParamInline(3.14'f32) + check p.oid == OidFloat4 + check p.len == 4 + # Match toPgParam exactly (same cast-to-int32 + BE encoding). + let old = toPgParam(3.14'f32) + check inlinePayload(p) == old.value.get + + test "float64 wire-format": + let p = toPgParamInline(3.14) + check p.oid == OidFloat8 + check p.len == 8 + let bits = fromBE64(inlinePayload(p)) + check abs(cast[float64](bits) - 3.14) < 1e-10 + + test "bool true / false": + let pt = toPgParamInline(true) + check pt.oid == OidBool + check pt.len == 1 + check pt.inlineBuf[0] == 1'u8 + let pf = toPgParamInline(false) + check pf.inlineBuf[0] == 0'u8 + + test "string short fits inline": + let p = toPgParamInline("hello") + check p.oid == OidText + check p.format == 0 + check p.len == 5 + check p.overflow.len == 0 + check inlinePayload(p) == @(toBytes("hello")) + + test "string empty": + let p = toPgParamInline("") + check p.len == 0 + check inlinePayload(p).len == 0 + + test "string at inline boundary (16 bytes)": + let s = "0123456789ABCDEF" # exactly 16 chars + let p = toPgParamInline(s) + check p.len == 16 + check p.overflow.len == 0 + check inlinePayload(p) == @(toBytes(s)) + + test "string above boundary uses overflow": + let s = "0123456789ABCDEFG" # 17 chars + let p = toPgParamInline(s) + check p.len == 17 + check p.overflow.len == 17 + check inlinePayload(p) == @(toBytes(s)) + + test "seq[byte] short fits inline": + let b = @[byte 1, 2, 3, 4] + let p = toPgParamInline(b) + check p.oid == OidBytea + check p.len == 4 + check p.overflow.len == 0 + check inlinePayload(p) == b + + test "seq[byte] above boundary uses overflow": + var b = newSeq[byte](32) + for i in 0 ..< 32: + b[i] = byte(i) + let p = toPgParamInline(b) + check p.len == 32 + check p.overflow.len == 32 + check inlinePayload(p) == b + + test "PgUuid matches toPgParam (OidUuid, text, overflow)": + # UUID string is 36 chars, exceeds PgInlineBufSize. + let u = PgUuid("550e8400-e29b-41d4-a716-446655440000") + let p = toPgParamInline(u) + let old = toPgParam(u) + check p.oid == OidUuid + check p.oid == old.oid + check p.format == old.format + check p.len == 36 + check p.overflow.len == 36 + check inlinePayload(p) == old.value.get + check inlinePayload(p) == @(toBytes(string(u))) + + test "PgMoney encodes int64 amount only": + let m = PgMoney(amount: 12345'i64, scale: 2'i8) + let p = toPgParamInline(m) + check p.oid == OidMoney + check p.len == 8 + check inlinePayload(p) == @(toBE64(12345'i64)) + + test "Option[int32] some delegates to int32": + let p = toPgParamInline(some(42'i32)) + check p.oid == OidInt4 + check p.len == 4 + check inlinePayload(p) == @(toBE32(42'i32)) + + test "Option[int32] none marks NULL": + let p = toPgParamInline(none(int32)) + check p.oid == OidInt4 + check p.format == 1 + check p.len == -1 + + test "Option[string] none marks NULL with text oid": + let p = toPgParamInline(none(string)) + check p.oid == OidText + check p.format == 0 + check p.len == -1 + +suite "addBindRaw wire-format parity": + test "single int32 param matches addBind": + let old = toPgParam(42'i32) + var legacyBuf: seq[byte] = @[] + legacyBuf.addBind("p", "s", [int16(1)], [old.value], []) + var rawBuf: seq[byte] = @[] + let data = old.value.get + let ranges = @[(off: int32(0), len: int32(data.len))] + rawBuf.addBindRaw("p", "s", [int16(1)], data, ranges, []) + check legacyBuf == rawBuf + + test "three params matches addBind": + let a = toPgParam(1'i32) + let b = toPgParam(2'i32) + let c = toPgParam(3'i32) + var legacyBuf: seq[byte] = @[] + legacyBuf.addBind("", "stmt", [int16(1), 1, 1], [a.value, b.value, c.value], []) + var data: seq[byte] = @[] + var ranges: seq[tuple[off: int32, len: int32]] = @[] + for v in [a.value.get, b.value.get, c.value.get]: + ranges.add (off: int32(data.len), len: int32(v.len)) + data.add v + var rawBuf: seq[byte] = @[] + rawBuf.addBindRaw("", "stmt", [int16(1), 1, 1], data, ranges, []) + check legacyBuf == rawBuf + + test "NULL param matches addBind": + var legacyBuf: seq[byte] = @[] + legacyBuf.addBind("", "s", [int16(1)], [none(seq[byte])], []) + var rawBuf: seq[byte] = @[] + rawBuf.addBindRaw("", "s", [int16(1)], @[], @[(off: int32(0), len: int32(-1))], []) + check legacyBuf == rawBuf + + test "mixed NULL and non-NULL matches addBind": + let a = toPgParam(7'i32) + var legacyBuf: seq[byte] = @[] + legacyBuf.addBind("", "s", [int16(1), 1], [a.value, none(seq[byte])], []) + var data: seq[byte] = a.value.get + var ranges = + @[(off: int32(0), len: int32(data.len)), (off: int32(0), len: int32(-1))] + var rawBuf: seq[byte] = @[] + rawBuf.addBindRaw("", "s", [int16(1), 1], data, ranges, []) + check legacyBuf == rawBuf + + test "result formats are preserved": + var legacyBuf: seq[byte] = @[] + legacyBuf.addBind("p", "", [int16(0)], [none(seq[byte])], [int16(1), 0, 1]) + var rawBuf: seq[byte] = @[] + rawBuf.addBindRaw( + "p", "", [int16(0)], @[], @[(off: int32(0), len: int32(-1))], [int16(1), 0, 1] + ) + check legacyBuf == rawBuf + +suite "addBindRaw range validation": + test "range len below -1 raises ValueError": + var buf: seq[byte] = @[] + expect ValueError: + buf.addBindRaw("", "", [int16(1)], @[], @[(off: int32(0), len: int32(-2))], []) + + test "negative off with non-zero len raises ValueError": + var buf: seq[byte] = @[] + let data = @[byte 1, 2, 3, 4] + expect ValueError: + buf.addBindRaw("", "", [int16(1)], data, @[(off: int32(-1), len: int32(4))], []) + + test "off + len past paramData.len raises ValueError": + var buf: seq[byte] = @[] + let data = @[byte 1, 2, 3, 4] + expect ValueError: + # off + len == 5, data.len == 4 — reads past end + buf.addBindRaw("", "", [int16(1)], data, @[(off: int32(1), len: int32(4))], []) + + test "range exactly at end of paramData is valid": + var buf: seq[byte] = @[] + let data = @[byte 1, 2, 3, 4] + # off + len == 4, data.len == 4 — boundary is inclusive on the data side + buf.addBindRaw("", "", [int16(1)], data, @[(off: int32(0), len: int32(4))], []) + check buf.len > 0 + + test "len == 0 with off at end of data is valid (empty string case)": + var buf: seq[byte] = @[] + let data = @[byte 1, 2, 3, 4] + # Mirrors flattenInline's encoding for empty strings: off = data.len, len = 0 + buf.addBindRaw("", "", [int16(0)], data, @[(off: int32(4), len: int32(0))], []) + check buf.len > 0 + + test "NULL range with arbitrary off is valid (off ignored when len == -1)": + var buf: seq[byte] = @[] + buf.addBindRaw("", "", [int16(1)], @[], @[(off: int32(999), len: int32(-1))], []) + check buf.len > 0 + +suite "parseAffectedRowsRaw": + test "UPDATE / DELETE returns trailing count": + check parseAffectedRows("UPDATE 3") == 3 + check parseAffectedRows("DELETE 5") == 5 + check parseAffectedRows("SELECT 100") == 100 + + test "INSERT 0 N returns N": + check parseAffectedRows("INSERT 0 1") == 1 + check parseAffectedRows("INSERT 0 42") == 42 + + test "trailing whitespace yields 0 (matches legacy split semantics)": + check parseAffectedRows("UPDATE 3 ") == 0 + check parseAffectedRows("UPDATE 7 ") == 0 + + test "empty / non-numeric returns 0": + check parseAffectedRows("") == 0 + check parseAffectedRows("CREATE TABLE") == 0 + check parseAffectedRows("COMMIT") == 0 + check parseAffectedRows(" ") == 0 + + test "single-token numeric tag": + # Tag that is just a number — not a real PostgreSQL tag but robust for safety. + check parseAffectedRows("123") == 123 + + test "large values": + check parseAffectedRows("INSERT 0 9999999999") == 9_999_999_999'i64 + + test "raw overload on char openArray": + let s = "UPDATE 42" + check parseAffectedRowsRaw(s.toOpenArray(0, s.high)) == 42 + +suite "flattenInline SoA layout": + test "empty input produces empty SoA": + let params: seq[PgParamInline] = @[] + let (data, ranges, oids, formats) = flattenInline(params) + check data.len == 0 + check ranges.len == 0 + check oids.len == 0 + check formats.len == 0 + + test "single short int32 param": + let params = @[toPgParamInline(42'i32)] + let (data, ranges, oids, formats) = flattenInline(params) + check data == @(toBE32(42'i32)) + check ranges.len == 1 + check ranges[0].off == 0 + check ranges[0].len == 4 + check oids == @[OidInt4] + check formats == @[1'i16] + + test "single NULL param (off=0, len=-1, no bytes written)": + let params = @[none(int32).toPgParamInline] + let (data, ranges, oids, formats) = flattenInline(params) + check data.len == 0 + check ranges.len == 1 + check ranges[0].len == -1 + check oids == @[OidInt4] + check formats == @[1'i16] + + test "single empty string (len=0, off points to current data.len)": + let params = @[toPgParamInline("")] + let (data, ranges, _, _) = flattenInline(params) + check data.len == 0 + check ranges.len == 1 + check ranges[0].len == 0 + check ranges[0].off == 0 + + test "boundary string (16 bytes fits in inlineBuf path)": + let params = @[toPgParamInline("0123456789ABCDEF")] + let (data, ranges, _, _) = flattenInline(params) + check data.len == 16 + check data == @(toBytes("0123456789ABCDEF")) + check ranges.len == 1 + check ranges[0].off == 0 + check ranges[0].len == 16 + + test "overflow string (17 bytes uses overflow path)": + let long = "0123456789ABCDEFG" # 17 chars + let params = @[toPgParamInline(long)] + let (data, ranges, _, _) = flattenInline(params) + check data.len == 17 + check data == @(toBytes(long)) + check ranges.len == 1 + check ranges[0].off == 0 + check ranges[0].len == 17 + + test "mixed [short, NULL, overflow, empty, short] — offsets are consecutive": + let long = "abcdefghijklmnopqrst" # 20 chars → overflow + let params = @[ + toPgParamInline(1'i32), # 4 bytes + none(int32).toPgParamInline, # NULL + toPgParamInline(long), # 20 bytes + toPgParamInline(""), # 0 bytes + toPgParamInline(7'i32), # 4 bytes + ] + let (data, ranges, oids, formats) = flattenInline(params) + check data.len == 4 + 20 + 4 # NULL and empty contribute nothing + check ranges.len == 5 + # int32(1) + check ranges[0].off == 0 + check ranges[0].len == 4 + check data[0 ..< 4] == @(toBE32(1'i32)) + # NULL + check ranges[1].len == -1 + # long overflow string + check ranges[2].off == 4 + check ranges[2].len == 20 + check data[4 ..< 24] == @(toBytes(long)) + # empty string — off points just past the overflow bytes, len 0 + check ranges[3].off == 24 + check ranges[3].len == 0 + # int32(7) + check ranges[4].off == 24 + check ranges[4].len == 4 + check data[24 ..< 28] == @(toBE32(7'i32)) + check oids == @[OidInt4, OidInt4, OidText, OidText, OidInt4] + # int32 uses binary(1), text uses text(0) + check formats == @[1'i16, 1'i16, 0'i16, 0'i16, 1'i16] + + test "100 int32 params — data size, offsets, oids all consistent": + var params = newSeqOfCap[PgParamInline](100) + for i in 0 ..< 100: + params.add toPgParamInline(int32(i * 3)) + let (data, ranges, oids, formats) = flattenInline(params) + check data.len == 100 * 4 + check ranges.len == 100 + for i in 0 ..< 100: + check ranges[i].off == int32(i * 4) + check ranges[i].len == 4 + check data[i * 4 ..< (i + 1) * 4] == @(toBE32(int32(i * 3))) + for o in oids: + check o == OidInt4 + for f in formats: + check f == 1'i16 + +suite "Pipeline appendInline SoA layout": + test "single op: inlineStart/Count correct, ranges point into p.inlineData": + privateAccess(Pipeline) + privateAccess(PipelineOp) + let p = newPipeline(nil) + p.addExec("INSERT", [toPgParamInline(42'i32), toPgParamInline("abc")]) + check p.ops.len == 1 + check p.ops[0].hasInline + check p.ops[0].inlineStart == 0 + check p.ops[0].inlineCount == 2 + check p.inlineRanges.len == 2 + check p.inlineOids == @[OidInt4, OidText] + check p.inlineFormats == @[1'i16, 0'i16] + # Byte layout: 4 bytes for int32 + 3 bytes for "abc" + check p.inlineData.len == 7 + check p.inlineRanges[0].off == 0 + check p.inlineRanges[0].len == 4 + check p.inlineRanges[1].off == 4 + check p.inlineRanges[1].len == 3 + + test "two ops: second op's inlineStart resumes after first": + privateAccess(Pipeline) + privateAccess(PipelineOp) + let p = newPipeline(nil) + p.addExec("INSERT 1", [toPgParamInline(1'i32)]) + p.addExec("INSERT 2", [toPgParamInline(2'i32), toPgParamInline(3'i32)]) + check p.ops.len == 2 + check p.ops[0].inlineStart == 0 + check p.ops[0].inlineCount == 1 + check p.ops[1].inlineStart == 1 + check p.ops[1].inlineCount == 2 + check p.inlineRanges.len == 3 + check p.inlineData.len == 12 + check p.inlineRanges[0].off == 0 # op0 param0 + check p.inlineRanges[1].off == 4 # op1 param0 + check p.inlineRanges[2].off == 8 # op1 param1 + + test "mixed NULL + overflow in one op, offsets correct": + privateAccess(Pipeline) + privateAccess(PipelineOp) + let p = newPipeline(nil) + let long = "abcdefghijklmnopqrst" # 20 chars → overflow + p.addExec( + "X", [toPgParamInline(9'i32), none(int32).toPgParamInline, toPgParamInline(long)] + ) + check p.ops[0].inlineCount == 3 + check p.inlineRanges.len == 3 + check p.inlineData.len == 4 + 20 # NULL contributes nothing + check p.inlineRanges[0].off == 0 + check p.inlineRanges[0].len == 4 + check p.inlineRanges[1].len == -1 # NULL + check p.inlineRanges[2].off == 4 + check p.inlineRanges[2].len == 20 + check p.inlineData[4 ..< 24] == @(toBytes(long)) + + test "addQuery + addExec populate same SoA buffer": + privateAccess(Pipeline) + privateAccess(PipelineOp) + let p = newPipeline(nil) + p.addQuery("SELECT", [toPgParamInline(1'i32)]) + p.addExec("INSERT", [toPgParamInline(2'i32)]) + check p.ops.len == 2 + check p.ops[0].kind == pokQuery + check p.ops[1].kind == pokExec + check p.ops[0].hasInline + check p.ops[1].hasInline + check p.ops[0].inlineStart == 0 + check p.ops[1].inlineStart == 1 + check p.inlineData.len == 8 + check p.inlineData[0 ..< 4] == @(toBE32(1'i32)) + check p.inlineData[4 ..< 8] == @(toBE32(2'i32)) + + test "addExec with empty inline params: hasInline=true, count=0": + # Edge case: the inline overload is chosen but no params are provided. + # executeImpl computes `endIdx = inlineStart + inlineCount - 1`, so for + # count==0 we get an empty `toOpenArray(start, start-1)` view — make sure + # that view is empty and the SoA buffers are untouched. + privateAccess(Pipeline) + privateAccess(PipelineOp) + let p = newPipeline(nil) + let empty: seq[PgParamInline] = @[] + p.addExec("SELECT 1", empty) + check p.ops.len == 1 + check p.ops[0].hasInline + check p.ops[0].inlineStart == 0 + check p.ops[0].inlineCount == 0 + check p.inlineRanges.len == 0 + check p.inlineOids.len == 0 + check p.inlineFormats.len == 0 + check p.inlineData.len == 0 + + test "addExec empty inline after a populated op: start resumes correctly": + # A second op with zero inline params must record `inlineStart` at the + # current tail of the SoA buffers, not 0. + privateAccess(Pipeline) + privateAccess(PipelineOp) + let p = newPipeline(nil) + p.addExec("A", [toPgParamInline(1'i32), toPgParamInline(2'i32)]) + let empty: seq[PgParamInline] = @[] + p.addExec("B", empty) + check p.ops.len == 2 + check p.ops[1].hasInline + check p.ops[1].inlineStart == 2 # resumes after the two params of op A + check p.ops[1].inlineCount == 0 + check p.inlineRanges.len == 2 # unchanged by op B