diff --git a/pynuodb/.gitignore b/pynuodb/.gitignore new file mode 100644 index 0000000..528e9fb --- /dev/null +++ b/pynuodb/.gitignore @@ -0,0 +1 @@ +_fetch.c diff --git a/pynuodb/_fetch.pyx b/pynuodb/_fetch.pyx new file mode 100644 index 0000000..b4d3809 --- /dev/null +++ b/pynuodb/_fetch.pyx @@ -0,0 +1,936 @@ +# cython: language_level=3 +# cython: boundscheck=False +# cython: wraparound=False +# cython: cdivision=True +"""Cython-accelerated hot paths for the NuoDB Python driver. + +Two things live here: + + ResultSet + A cdef class that replaces the pure-Python result_set.ResultSet. The + typed cdef attributes and cpdef methods eliminate Python attribute-lookup + and function-call overhead on the millions of per-row fetchone() and + is_complete() calls during a full-table scan. + + decode_next_batch() + The inner decode loop from EncodedSession.fetch_result_set_next(), + rewritten in C. It handles every common NuoDB wire type inline: + + * Inline integers (INTMINUS10 .. INT31, codes 10-51) + * Multi-byte ints (INTLEN1 .. INTLEN8, codes 52-59) + * Short UTF-8 (UTF8LEN0 .. UTF8LEN39, codes 109-148) + * Counted UTF-8 (UTF8COUNT1 .. UTF8COUNT4, codes 69-72) + * Short OPAQUE (OPAQUELEN0 .. OPAQUELEN39, codes 149-188) + * Counted OPAQUE (OPAQUECOUNT1 .. OPAQUECOUNT4, codes 73-76) + * IEEE-754 DOUBLE (DOUBLELEN0 .. DOUBLELEN8, codes 77-85) + * MILLISEC/NANOSEC/TIME ints (codes 86-108) + * SCALED Decimal (SCALEDLEN0 .. SCALEDLEN8, codes 60-68) + * SCALEDDATE (codes 201-208) + * SCALEDTIME (codes 209-216, uses tz_info) + * SCALEDTIMESTAMP (codes 217-224, uses tz_info) + * SCALEDTIMESTAMPNOTZ (code 241) + * BLOB / CLOB (codes 189-198) + * UUID (code 200) + * NULL / TRUE / FALSE + + Truly exotic codes (VECTOR, SCALEDCOUNT2/3, LOBSTREAM, ARRAY) fall back + to a Python callable so correctness is never sacrificed. +""" + +from cpython.unicode cimport PyUnicode_DecodeUTF8, PyUnicode_AsUTF8String +from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AsString, PyBytes_GET_SIZE +from cpython.bytearray cimport ( + PyByteArray_FromStringAndSize, + PyByteArray_AS_STRING, + PyByteArray_GET_SIZE, + PyByteArray_Resize, +) +from cpython.long cimport PyLong_AsLongLongAndOverflow +from cpython.tuple cimport PyTuple_New +from cpython.ref cimport PyObject +from libc.string cimport memcpy + +import struct as _struct +import decimal as _decimal +import uuid as _uuid +from . import datatype as _datatype +from .exception import ProgrammingError + +# Cached Python constructors / helpers used by the complex-type branches. +# Looked up once at module import; the decode loop references these via +# the C name-lookup optimization the Cython compiler emits for module +# globals (one indirection, no per-cell `LOAD_GLOBAL`). +_Decimal = _decimal.Decimal +_Binary = _datatype.Binary +_DateFromTicks = _datatype.DateFromTicks +_TimeFromTicks = _datatype.TimeFromTicks +_TimestampFromTicks = _datatype.TimestampFromTicks +_UUID = _uuid.UUID + +# C-level helpers that bypass Cython's automatic INCREF/DECREF on `object`-typed +# arguments. PyTuple_SET_ITEM steals a reference, and PyLong_FromLong / +# PyUnicode_DecodeUTF8 return a new reference, so the combination is leak-free +# but Cython's automatic refcount handling on `object` parameters double-INCREFs. +# By passing PyObject* through this thin C wrapper, we eliminate 2 refcount ops +# per cell. +cdef extern from *: + """ + #include + #include + #include + static CYTHON_INLINE void _pynuodb_tuple_steal( + PyObject *t, Py_ssize_t i, PyObject *o) { + PyTuple_SET_ITEM(t, i, o); + } + static CYTHON_INLINE PyObject *_pynuodb_long_from_long(long v) { + return PyLong_FromLong(v); + } + static CYTHON_INLINE PyObject *_pynuodb_long_from_longlong(long long v) { + return PyLong_FromLongLong(v); + } + static CYTHON_INLINE PyObject *_pynuodb_decode_utf8( + const char *s, Py_ssize_t n) { + return PyUnicode_DecodeUTF8(s, n, NULL); + } + static CYTHON_INLINE PyObject *_pynuodb_incref(PyObject *o) { + Py_INCREF(o); + return o; + } + /* Read a 1-to-8-byte big-endian IEEE-754 double, padding missing low + bytes with zero (matching getDouble()'s "append zeros until 8 bytes" + behaviour). Portable across endianness: builds the IEEE bit pattern + as a uint64_t in native order, then memcpy's into a double. */ + static CYTHON_INLINE double _pynuodb_be_double( + const unsigned char *p, int n) { + unsigned char buf[8] = {0}; + int i; + for (i = 0; i < n; i++) buf[i] = p[i]; + uint64_t v = + ((uint64_t)buf[0] << 56) | ((uint64_t)buf[1] << 48) | + ((uint64_t)buf[2] << 40) | ((uint64_t)buf[3] << 32) | + ((uint64_t)buf[4] << 24) | ((uint64_t)buf[5] << 16) | + ((uint64_t)buf[6] << 8) | (uint64_t)buf[7]; + double d; + memcpy(&d, &v, 8); + return d; + } + /* Little-endian double, used for the VECTOR(DOUBLE) payload format. */ + static CYTHON_INLINE double _pynuodb_le_double(const unsigned char *p) { + uint64_t v = + ((uint64_t)p[7] << 56) | ((uint64_t)p[6] << 48) | + ((uint64_t)p[5] << 40) | ((uint64_t)p[4] << 32) | + ((uint64_t)p[3] << 24) | ((uint64_t)p[2] << 16) | + ((uint64_t)p[1] << 8) | (uint64_t)p[0]; + double d; + memcpy(&d, &v, 8); + return d; + } + /* Specialised big-endian signed integer reader. N=1,2,4,8 use a single + memcpy + bswap (1-2 native instructions); other lengths fall back to a + small unrolled accumulator with explicit sign-extension on the high bit + of the first byte. Compare to the byte-by-byte loop in EncodedInputStream + C++ -- both compile to the same shape for the hot N=4/8 cases. */ + static CYTHON_INLINE long long _pynuodb_be_i64( + const unsigned char *p, int n) { + unsigned long long v; + switch (n) { + case 0: return 0; + case 1: return (long long)(signed char)p[0]; + case 2: { uint16_t x; memcpy(&x, p, 2); + return (long long)(int16_t)__builtin_bswap16(x); } + case 4: { uint32_t x; memcpy(&x, p, 4); + return (long long)(int32_t)__builtin_bswap32(x); } + case 8: { uint64_t x; memcpy(&x, p, 8); + return (long long)(int64_t)__builtin_bswap64(x); } + case 3: + v = ((unsigned long long)p[0] << 16) | + ((unsigned long long)p[1] << 8) | p[2]; + if (p[0] & 0x80) v |= 0xFFFFFFFFFF000000ULL; + return (long long)v; + case 5: + v = ((unsigned long long)p[0] << 32) | + ((unsigned long long)p[1] << 24) | + ((unsigned long long)p[2] << 16) | + ((unsigned long long)p[3] << 8) | p[4]; + if (p[0] & 0x80) v |= 0xFFFFFF0000000000ULL; + return (long long)v; + case 6: + v = ((unsigned long long)p[0] << 40) | + ((unsigned long long)p[1] << 32) | + ((unsigned long long)p[2] << 24) | + ((unsigned long long)p[3] << 16) | + ((unsigned long long)p[4] << 8) | p[5]; + if (p[0] & 0x80) v |= 0xFFFF000000000000ULL; + return (long long)v; + case 7: + v = ((unsigned long long)p[0] << 48) | + ((unsigned long long)p[1] << 40) | + ((unsigned long long)p[2] << 32) | + ((unsigned long long)p[3] << 24) | + ((unsigned long long)p[4] << 16) | + ((unsigned long long)p[5] << 8) | p[6]; + if (p[0] & 0x80) v |= 0xFF00000000000000ULL; + return (long long)v; + default: + v = 0; + for (int i = 0; i < n; i++) v = (v << 8) | p[i]; + if (n > 0 && (p[0] & 0x80)) + v -= ((unsigned long long)1) << (n << 3); + return (long long)v; + } + } + /* Specialised big-endian unsigned reader for length prefixes (n<=8). */ + static CYTHON_INLINE unsigned long long _pynuodb_be_u64( + const unsigned char *p, int n) { + switch (n) { + case 0: return 0; + case 1: return p[0]; + case 2: { uint16_t x; memcpy(&x, p, 2); + return __builtin_bswap16(x); } + case 4: { uint32_t x; memcpy(&x, p, 4); + return __builtin_bswap32(x); } + case 8: { uint64_t x; memcpy(&x, p, 8); + return __builtin_bswap64(x); } + case 3: return ((unsigned long long)p[0] << 16) | + ((unsigned long long)p[1] << 8) | p[2]; + case 5: return ((unsigned long long)p[0] << 32) | + ((unsigned long long)p[1] << 24) | + ((unsigned long long)p[2] << 16) | + ((unsigned long long)p[3] << 8) | p[4]; + case 6: return ((unsigned long long)p[0] << 40) | + ((unsigned long long)p[1] << 32) | + ((unsigned long long)p[2] << 24) | + ((unsigned long long)p[3] << 16) | + ((unsigned long long)p[4] << 8) | p[5]; + case 7: return ((unsigned long long)p[0] << 48) | + ((unsigned long long)p[1] << 40) | + ((unsigned long long)p[2] << 32) | + ((unsigned long long)p[3] << 24) | + ((unsigned long long)p[4] << 16) | + ((unsigned long long)p[5] << 8) | p[6]; + default: { + unsigned long long v = 0; + for (int i = 0; i < n; i++) v = (v << 8) | p[i]; + return v; + } + } + } + /* Build a Python int from n big-endian signed bytes WITHOUT first + allocating a temporary `bytes` object. _PyLong_FromByteArray is the + same C function that int.from_bytes() ultimately dispatches to; calling + it directly here saves one PyBytes allocation + one Python-level method + call per scaled-Decimal cell. */ + static CYTHON_INLINE PyObject *_pynuodb_pylong_be_signed( + const unsigned char *p, Py_ssize_t n) { + return _PyLong_FromByteArray(p, (size_t)n, 0, 1); + } + /* Encode a long-long signed integer in the NuoDB wire format directly + into `buf` (caller-owned, must have >= 9 bytes of headroom). Writes + 1 byte (INT0 + v) for v in [-10, 31] and otherwise INTLEN0+nbytes + followed by the minimal two's-complement big-endian bytes. Returns + total bytes written. */ + static CYTHON_INLINE int _pynuodb_emit_int(char *buf, long long v) { + if (v >= -10 && v <= 31) { + buf[0] = (char)(20 + v); /* INT0 + v */ + return 1; + } + int nbytes; + if (v < 0) { + if (v >= -128LL) nbytes = 1; + else if (v >= -32768LL) nbytes = 2; + else if (v >= -8388608LL) nbytes = 3; + else if (v >= -2147483648LL) nbytes = 4; + else if (v >= -549755813888LL) nbytes = 5; + else if (v >= -140737488355328LL) nbytes = 6; + else if (v >= -36028797018963968LL) nbytes = 7; + else nbytes = 8; + } else { + if (v <= 127LL) nbytes = 1; + else if (v <= 32767LL) nbytes = 2; + else if (v <= 8388607LL) nbytes = 3; + else if (v <= 2147483647LL) nbytes = 4; + else if (v <= 549755813887LL) nbytes = 5; + else if (v <= 140737488355327LL) nbytes = 6; + else if (v <= 36028797018963967LL) nbytes = 7; + else nbytes = 8; + } + buf[0] = (char)(51 + nbytes); /* INTLEN0 + nbytes */ + unsigned long long uv = (unsigned long long)v; + int i; + for (i = 0; i < nbytes; i++) { + buf[nbytes - i] = (char)(uv & 0xff); + uv >>= 8; + } + return 1 + nbytes; + } + """ + void _pynuodb_tuple_steal(PyObject *t, Py_ssize_t i, PyObject *o) nogil + PyObject *_pynuodb_long_from_long(long v) nogil + PyObject *_pynuodb_long_from_longlong(long long v) nogil + PyObject *_pynuodb_decode_utf8(const char *s, Py_ssize_t n) nogil + PyObject *_pynuodb_incref(PyObject *o) nogil + double _pynuodb_be_double(const unsigned char *p, int n) nogil + double _pynuodb_le_double(const unsigned char *p) nogil + long long _pynuodb_be_i64(const unsigned char *p, int n) nogil + unsigned long long _pynuodb_be_u64(const unsigned char *p, int n) nogil + object _pynuodb_pylong_be_signed(const unsigned char *p, Py_ssize_t n) + int _pynuodb_emit_int(char *buf, long long v) nogil + + +# Wire-protocol constants as DEF (compile-time, no Python attribute lookup). +# Keep in sync with protocol.py. + +DEF NULL_V = 1 +DEF TRUE_V = 2 +DEF FALSE_V = 3 +DEF INTMINUS10 = 10 +DEF INT0 = 20 # inline-integer value 0 (end-of-batch marker) +DEF INT31 = 51 +DEF INTLEN0 = 51 # == INT31; codes 52-59 carry 1-8 byte integers +DEF INTLEN8 = 59 +DEF SCALEDLEN0 = 60 # base for 1-8 byte scaled decimals (61-68) +DEF SCALEDLEN8 = 68 +DEF UTF8COUNT0 = 68 # base for length-prefixed strings (69-72) +DEF UTF8COUNT1 = 69 +DEF UTF8COUNT4 = 72 +DEF OPAQUECOUNT0 = 72 # base for length-prefixed binary (73-76) +DEF OPAQUECOUNT1 = 73 +DEF OPAQUECOUNT4 = 76 +DEF DOUBLELEN0 = 77 # 77 == double 0.0; 78-85 carry 1-8 byte doubles +DEF DOUBLELEN8 = 85 +DEF MILLISECLEN0 = 86 # base for 1-8 byte millisecond timestamps +DEF MILLISECLEN8 = 94 +DEF NANOSECLEN0 = 95 # base for 1-8 byte nanosecond timestamps +DEF NANOSECLEN8 = 103 +DEF TIMELEN0 = 104 # base for 1-4 byte ms-since-midnight +DEF TIMELEN4 = 108 +DEF UTF8LEN0 = 109 # base for 0-39 byte inline-length strings +DEF UTF8LEN39 = 148 +DEF OPAQUELEN0 = 149 # base for 0-39 byte inline-length binary +DEF OPAQUELEN39 = 188 +DEF BLOBLEN0 = 189 # base for 0-4 byte length-prefixed BLOB +DEF BLOBLEN4 = 193 +DEF CLOBLEN0 = 194 # base for 0-4 byte length-prefixed CLOB +DEF CLOBLEN4 = 198 +DEF VECTOR_C = 199 +DEF UUID_C = 200 +DEF SCALEDDATELEN0 = 200 # 201-208 carry 1-8 byte scaled dates +DEF SCALEDDATELEN8 = 208 +DEF SCALEDTIMELEN0 = 208 # 209-216 carry 1-8 byte scaled times +DEF SCALEDTIMELEN8 = 216 +DEF SCALEDTIMESTAMPLEN0 = 216 # 217-224 carry 1-8 byte scaled timestamps +DEF SCALEDTIMESTAMPLEN8 = 224 +DEF SCALEDTIMESTAMPNOTZLEN0 = 233 # 234-240 carry 1-7 byte scaled timestamps no-tz +DEF SCALEDTIMESTAMPNOTZ = 241 + + +# ----- ResultSet cdef class -------------------------------------------------- + +cdef class ResultSet: + """Drop-in replacement for result_set.ResultSet with C-typed attributes. + + fetchone() and is_complete() become direct C calls when invoked from other + Cython code (cpdef dispatch). Python callers see the same interface. + """ + + cdef public int handle + cdef public int col_count + cdef public list results + cdef public int results_idx + cdef public bint complete + + def __init__(self, int handle, int col_count, list initial_results, + bint complete): + self.handle = handle + self.col_count = col_count + self.results = initial_results + self.results_idx = 0 + self.complete = complete + + def clear_results(self): + del self.results[:] + self.results_idx = 0 + + def add_row(self, row): + self.results.append(row) + + cpdef bint is_complete(self): + return self.complete or self.results_idx != len(self.results) + + cpdef object fetchone(self): + cdef int idx = self.results_idx + if idx == len(self.results): + return None + self.results_idx = idx + 1 + return self.results[idx] + + +# ----- C helpers for big-endian integer reads -------------------------------- + +cdef inline long long _read_be_signed(const unsigned char* p, int n) nogil: + """n-byte big-endian signed integer (n in 0..8). + + Forwards to the C helper which uses memcpy + __builtin_bswap for the + fast N=2/4/8 cases (one native instruction after inlining). + """ + return _pynuodb_be_i64(p, n) + + +cdef inline Py_ssize_t _read_be_uint(const unsigned char* p, int n) nogil: + """n-byte big-endian unsigned integer (n in 0..8).""" + return _pynuodb_be_u64(p, n) + + +# ----- Helpers for complex Python-constructed types -------------------------- +# +# These are pure-Python `def` functions so they accept arbitrary-precision +# Python ints (which we get from `int.from_bytes` for multi-byte data). +# Called from the decode loop with already-decoded primitives so each cell +# only crosses the C/Python boundary once, instead of going through the +# generic `exotic_fn` -> `getValue()` dispatch. + +def _make_decimal(value, int scale): + """Build decimal.Decimal((sign, digit_tuple, -scale)) from an arbitrary + precision Python int and an unsigned scale byte. + + Mirrors EncodedSession.getScaledInt() exactly so callers see identical + Decimal representations (sign/digits/exponent). + """ + cdef int sign = 1 if value < 0 else 0 + digits = tuple(int(c) for c in str(abs(value))) + return _Decimal((sign, digits, -scale)) + + +def _unpack_time_scale(int scale, time_val): + """Return (seconds, micros) from (scale, raw_ticks). + + Mirrors EncodedSession.__unpack(). Uses Python arithmetic so it works + for arbitrary-precision raw values without overflow. + """ + # NOTE: with cython: cdivision=True at the top of this file, the expression + # `10 ** scale` (10 is a C int literal, scale is a C int) compiles to a + # C `pow()` call returning double. Force Python int semantics by casting + # one operand to `object`. + cdef object shiftr = (10) ** scale + cdef object ticks = time_val // shiftr + cdef object fraction = time_val % shiftr + cdef object micros + if scale > 6: + micros = fraction // ((10) ** (scale - 6)) + else: + micros = fraction * ((10) ** (6 - scale)) + if micros < 0: + micros = micros % 1000000 + ticks = ticks + 1 + return ticks, micros + + +def _make_scaled_date(date_val, int scale): + """SCALEDDATE: DateFromTicks(date // 10**scale).""" + return _DateFromTicks(date_val // ((10) ** scale)) + + +def _make_scaled_time(int scale, time_val, tz): + """SCALEDTIME: TimeFromTicks(seconds, micros, tz).""" + seconds, micros = _unpack_time_scale(scale, time_val) + return _TimeFromTicks(seconds, micros, tz) + + +def _make_scaled_ts(int scale, stamp_val, tz): + """SCALEDTIMESTAMP: TimestampFromTicks(seconds, micros, tz).""" + seconds, micros = _unpack_time_scale(scale, stamp_val) + return _TimestampFromTicks(seconds, micros, tz) + + +def _make_scaled_ts_notz(int scale, stamp_val): + """SCALEDTIMESTAMPNOTZ: TimestampFromTicks(seconds, micros, None).""" + seconds, micros = _unpack_time_scale(scale, stamp_val) + return _TimestampFromTicks(seconds, micros, None) + + +# ----- Wire-decode inner loop ------------------------------------------------ + +def decode_next_batch(bytearray data, Py_ssize_t pos, int col_count, + list results, object exotic_fn, object tz_info=None): + """Decode one server batch from the wire buffer. + + Parameters + ---------- + data : bytearray holding the raw server message (self.__input). + pos : read cursor (self.__inpos) at entry. + col_count : columns per row. + results : list to which decoded row-tuples are appended in place. + exotic_fn : callable(pos) -> (value, new_pos) for non-fast-path types. + Called for unusual row-marker encodings and any wire codes + not handled inline (VECTOR, SCALEDCOUNT2/3, LOBSTREAM, ARRAY, + DEBUGBARRIER). Should be EncodedSession._cython_exotic_decode. + tz_info : tzinfo for SCALEDTIME / SCALEDTIMESTAMP construction. May be + None if the caller doesn't expect tz-bearing time columns; + if a tz-bearing code is encountered with tz_info=None the + wire-decode still completes but TimeFromTicks/TimestampFromTicks + will use the local zone (matching the Python default). + + Returns + ------- + (new_pos: int, complete: bool) + """ + cdef: + Py_ssize_t n = len(data) + unsigned char[:] mv = data # typed memoryview - direct C access + const unsigned char* base # raw pointer for multi-byte reads + int code, nbytes, col, scale + Py_ssize_t length + long long ival + bint complete = False + object marker_obj, val, value_obj + object row_tup + PyObject* row_ptr + PyObject* empty_str_ptr = u'' + PyObject* none_ptr = None + PyObject* true_ptr = True + PyObject* false_ptr = False + + if n == 0: + return pos, False + + base = &mv[0] + + while pos < n: + # --- row-presence marker ------------------------------------------- + code = base[pos] + if INTMINUS10 <= code <= INT31: + pos += 1 + if code == INT0: # marker 0 -> end of batch + complete = True + break + # non-zero marker -> row follows; marker value is otherwise ignored + else: + # Rare: non-inline marker (e.g. value > 31 encoded as INTLEN...). + # Can't be zero (zero is always inline), so just advance past it. + marker_obj, pos = exotic_fn(pos) + # marker_obj != 0 guaranteed; continue to row decode + + # --- decode one row ------------------------------------------------- + # Build a tuple directly via the C API - avoids allocating a temporary + # list and the list->tuple copy that tuple() performs. All slot + # assignments use _pynuodb_tuple_steal (raw PyObject* -> no auto-INCREF), + # so each cell is exactly one new-reference creation + one slot store, + # with no extra refcount bumps. + row_tup = PyTuple_New(col_count) + row_ptr = row_tup + for col in range(col_count): + code = base[pos] + + # ·· NULL / TRUE / FALSE: codes 1-3 (must come before int range + # test so nullable columns don't pay for the long elif chain) ·· + if code <= FALSE_V: + if code == NULL_V: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(none_ptr)) + elif code == TRUE_V: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(true_ptr)) + else: # FALSE_V + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(false_ptr)) + pos += 1 + + # ·· integers: codes 10-59 ······································ + elif INTMINUS10 <= code <= INTLEN8: + if code <= INT31: # inline -10 .. 31 + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_long_from_long(code - INT0)) + pos += 1 + else: # INTLEN1..INTLEN8: 52..59 + nbytes = code - INTLEN0 # 1..8 bytes follow + pos += 1 + ival = _read_be_signed(base + pos, nbytes) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_long_from_longlong(ival)) + pos += nbytes + + # ·· short UTF-8: codes 109-148 ································· + elif UTF8LEN0 <= code <= UTF8LEN39: + length = code - UTF8LEN0 + pos += 1 + if length: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_decode_utf8((base + pos), length)) + else: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(empty_str_ptr)) + pos += length + + # ·· length-prefixed UTF-8: codes 69-72 ························· + elif UTF8COUNT1 <= code <= UTF8COUNT4: + nbytes = code - UTF8COUNT0 # 1..4 length bytes + pos += 1 + length = _read_be_uint(base + pos, nbytes) + pos += nbytes + if length: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_decode_utf8((base + pos), length)) + else: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(empty_str_ptr)) + pos += length + + # ·· short OPAQUE/binary: codes 149-188 ·························· + elif OPAQUELEN0 <= code <= OPAQUELEN39: + length = code - OPAQUELEN0 + pos += 1 + # Slice yields a bytearray; Binary(bytearray) returns + # bytes-subclass. We have to go through Binary's __new__ to + # preserve the public API contract from getOpaque(). + val = _Binary(data[pos:pos + length]) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + pos += length + + # ·· length-prefixed OPAQUE: codes 73-76 ························· + elif OPAQUECOUNT1 <= code <= OPAQUECOUNT4: + nbytes = code - OPAQUECOUNT0 + pos += 1 + length = _read_be_uint(base + pos, nbytes) + pos += nbytes + val = _Binary(data[pos:pos + length]) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + pos += length + + # ·· DOUBLE: codes 77-85 ········································ + elif DOUBLELEN0 <= code <= DOUBLELEN8: + nbytes = code - DOUBLELEN0 # 0..8 bytes + pos += 1 + val = float(_pynuodb_be_double(base + pos, nbytes)) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + pos += nbytes + + # ·· MILLISEC/NANOSEC timestamps as raw ints: codes 86-103 ········ + elif MILLISECLEN0 <= code <= NANOSECLEN8: + # getTime() in the Python decoder returns the raw integer for + # both MILLISECLEN and NANOSECLEN families - the caller is + # expected to interpret the units. Match that behaviour. + if code <= MILLISECLEN8: + nbytes = code - MILLISECLEN0 + else: + nbytes = code - NANOSECLEN0 + pos += 1 + if nbytes == 0: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_long_from_long(0)) + else: + ival = _read_be_signed(base + pos, nbytes) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_long_from_longlong(ival)) + pos += nbytes + + # ·· TIME (ms since midnight): codes 104-108 ····················· + elif TIMELEN0 <= code <= TIMELEN4: + nbytes = code - TIMELEN0 + pos += 1 + if nbytes == 0: + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_long_from_long(0)) + else: + # getTime() uses fromByteString here (unsigned). + length = _read_be_uint(base + pos, nbytes) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_long_from_longlong(length)) + pos += nbytes + + # ·· SCALED Decimal: codes 60-68 ································· + # Note: SCALEDLEN0 == UTF8COUNT0 == 68; SCALEDLEN values 61-68 + # carry 1-8 byte signed data preceded by a 1-byte scale. The + # range here is *inside* the (60..68) bucket and is distinct + # from the UTF8COUNT range (69-72) checked earlier. + elif SCALEDLEN0 < code <= SCALEDLEN8: + nbytes = code - SCALEDLEN0 # 1..8 data bytes + pos += 1 + scale = base[pos] # unsigned scale byte + pos += 1 + # Use Python int for arbitrary precision (matches getScaledInt) + value_obj = _pynuodb_pylong_be_signed(base + pos, nbytes) + pos += nbytes + val = _make_decimal(value_obj, scale) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + + # ·· BLOB: codes 189-193 ········································· + elif BLOBLEN0 <= code <= BLOBLEN4: + nbytes = code - BLOBLEN0 # 0..4 length bytes + pos += 1 + if nbytes == 0: + length = 0 + else: + length = _read_be_uint(base + pos, nbytes) + pos += nbytes + val = _Binary(data[pos:pos + length]) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + pos += length + + # ·· CLOB: codes 194-198 ········································· + elif CLOBLEN0 <= code <= CLOBLEN4: + nbytes = code - CLOBLEN0 + pos += 1 + if nbytes == 0: + length = 0 + else: + length = _read_be_uint(base + pos, nbytes) + pos += nbytes + # getClob() returns the raw bytearray slice. + val = data[pos:pos + length] + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + pos += length + + # ·· UUID (code 200) -- must come before SCALEDDATE because they ·· + # share offsets, but UUID is a single fixed code. + elif code == UUID_C: + pos += 1 + # Need bytes (not bytearray) for uuid.UUID(bytes=...) + val = _UUID(bytes=bytes(data[pos:pos + 16])) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + pos += 16 + + # ·· SCALEDDATE: codes 201-208 ··································· + elif SCALEDDATELEN0 < code <= SCALEDDATELEN8: + nbytes = code - SCALEDDATELEN0 + pos += 1 + scale = base[pos] + pos += 1 + value_obj = _pynuodb_pylong_be_signed(base + pos, nbytes) + pos += nbytes + val = _make_scaled_date(value_obj, scale) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + + # ·· SCALEDTIME: codes 209-216 ··································· + elif SCALEDTIMELEN0 < code <= SCALEDTIMELEN8: + nbytes = code - SCALEDTIMELEN0 + pos += 1 + scale = base[pos] + pos += 1 + value_obj = _pynuodb_pylong_be_signed(base + pos, nbytes) + pos += nbytes + val = _make_scaled_time(scale, value_obj, tz_info) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + + # ·· SCALEDTIMESTAMP: codes 217-224 ······························ + elif SCALEDTIMESTAMPLEN0 < code <= SCALEDTIMESTAMPLEN8: + nbytes = code - SCALEDTIMESTAMPLEN0 + pos += 1 + scale = base[pos] + pos += 1 + value_obj = _pynuodb_pylong_be_signed(base + pos, nbytes) + pos += nbytes + val = _make_scaled_ts(scale, value_obj, tz_info) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + + # ·· SCALEDTIMESTAMPNOTZ: code 241 ······························· + elif code == SCALEDTIMESTAMPNOTZ: + # Wire format: scale byte + signed bytes (length = code - LEN0). + # In practice code == 241 means 8 bytes (LEN0=233). + nbytes = code - SCALEDTIMESTAMPNOTZLEN0 + pos += 1 + scale = base[pos] + pos += 1 + value_obj = _pynuodb_pylong_be_signed(base + pos, nbytes) + pos += nbytes + val = _make_scaled_ts_notz(scale, value_obj) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + + else: + # Anything we don't handle inline (VECTOR, SCALEDCOUNT2/3, + # LOBSTREAM, ARRAY, DEBUGBARRIER, or new wire codes from a + # future protocol bump) goes through the Python fallback. + val, pos = exotic_fn(pos) + _pynuodb_tuple_steal(row_ptr, col, + _pynuodb_incref(val)) + + results.append(row_tup) + + return pos, complete + + +# ----- Wire-encode hot path (executemany) ------------------------------------ +# +# encode_batch() mirrors the Python loop in +# EncodedSession.execute_batch_prepared_statement(), but inlines the int / +# str / bool / None paths in C. Anything else (Decimal, Date, Time, Vector, +# Binary, etc.) falls back to session.putValue(), which writes to the same +# bytearray. +# +# Layout per row: +# one byte for typical small arity +# +# ... +# +# +# We grow the destination bytearray in chunks (PyByteArray_Resize is amortised +# constant), write through a raw C pointer for the inline cases, and call +# session.putValue() only for non-inline parameters - keeping pos in sync by +# resizing back to `pos` first and then rereading the size after the call. + +DEF NULL_WIRE = 1 +DEF UTF8LEN0_W = 109 +DEF UTF8COUNT0_W = 68 +DEF FALSE_WIRE = 3 +DEF TRUE_WIRE = 2 + + +def encode_batch(bytearray out, list param_lists, int parameter_count, + object session): + """Encode parameter rows for EXECUTEBATCHPREPAREDSTATEMENT into `out`. + + `out` : the session's outgoing message bytearray (self.__output). + `param_lists` : iterable of equally-sized parameter tuples/lists. + `parameter_count` : expected len(row); checked per row. + `session` : EncodedSession instance whose putValue() is called for any + parameter not handled inline (Decimal, datetime types, + Binary, Vector, etc.). session.__output MUST be the same + object as `out`, since the fallback appends to it directly. + + Does NOT emit the batch trailer (the -1 + len(param_lists) tag bytes); the + caller writes those after this returns so existing protocol semantics are + unchanged. + + Returns the number of rows encoded. + """ + cdef Py_ssize_t nrows = len(param_lists) + if nrows == 0: + return 0 + + cdef Py_ssize_t orig_size = PyByteArray_GET_SIZE(out) + + # Pre-compute the row marker bytes once; parameter_count is constant for + # the batch. In the common (small-arity) case this is a single byte. + cdef char row_marker[16] + cdef int row_marker_len = _pynuodb_emit_int(row_marker, + parameter_count) + + # Optimistic up-front growth. Worst-case per inline int/bool is 9 bytes + # (INTLEN0+8 + 8 data); short strings tend to be < 16 bytes including the + # tag. Use 12 bytes per param as a reasonable initial estimate; if a + # particular row exceeds it we re-grow in the loop. + cdef Py_ssize_t per_row_estimate = row_marker_len + parameter_count * 12 + cdef Py_ssize_t want = orig_size + nrows * per_row_estimate + if PyByteArray_Resize(out, want) != 0: + raise MemoryError() + cdef char *buf = PyByteArray_AS_STRING(out) + cdef Py_ssize_t pos = orig_size + cdef Py_ssize_t cap = PyByteArray_GET_SIZE(out) + + cdef long long ival + cdef int overflow + cdef Py_ssize_t plen, slen, lenbytes_n, i, headroom + cdef object v, utf8b + cdef const char *sptr + cdef object row + cdef type tv + + for row in param_lists: + plen = len(row) + if plen != parameter_count: + # Resize the bytearray back to the actual end before raising so + # the caller's view is consistent. + if PyByteArray_Resize(out, pos) != 0: + pass + raise ProgrammingError( + "Incorrect number of parameters specified, expected %d, got %d" + % (parameter_count, plen)) + + # Ensure capacity for this row's worst case BEFORE writing anything. + # Strings are length-bounded; the upper bound we can guarantee inline + # without inspecting values is row_marker + parameter_count * 9 (the + # int worst case). Strings are handled with explicit re-grows below. + headroom = row_marker_len + parameter_count * 9 + if pos + headroom > cap: + cap = pos + headroom + max(4096, (nrows - 1) * per_row_estimate) + if PyByteArray_Resize(out, cap) != 0: + raise MemoryError() + buf = PyByteArray_AS_STRING(out) + cap = PyByteArray_GET_SIZE(out) + + # Write row marker. + for i in range(row_marker_len): + buf[pos + i] = row_marker[i] + pos += row_marker_len + + for v in row: + # ---- None ---------------------------------------------------- + if v is None: + buf[pos] = NULL_WIRE + pos += 1 + continue + + tv = type(v) + + # ---- int / bool (inline if it fits in long long) ------------ + if tv is int or tv is bool: + ival = PyLong_AsLongLongAndOverflow(v, &overflow) + if overflow == 0: + pos += _pynuodb_emit_int(buf + pos, ival) + continue + # else: bignum, fall through to fallback + + # ---- str (UTF-8 encode + tag) ------------------------------- + elif tv is str: + utf8b = PyUnicode_AsUTF8String(v) + slen = PyBytes_GET_SIZE(utf8b) + sptr = PyBytes_AsString(utf8b) + # Worst case: 1 tag byte + 4 length bytes + slen data bytes. + # Re-grow if not enough headroom. + if pos + 1 + 4 + slen > cap: + cap = pos + 1 + 4 + slen + max(4096, + (nrows - 1) * per_row_estimate) + if PyByteArray_Resize(out, cap) != 0: + raise MemoryError() + buf = PyByteArray_AS_STRING(out) + cap = PyByteArray_GET_SIZE(out) + if slen < 40: + buf[pos] = (UTF8LEN0_W + slen) + pos += 1 + else: + # Length-prefixed: emit UTF8COUNT0+nlb, then minimal + # big-endian unsigned length, then data. + if slen <= 0xff: lenbytes_n = 1 + elif slen <= 0xffff: lenbytes_n = 2 + elif slen <= 0xffffff: lenbytes_n = 3 + else: lenbytes_n = 4 + buf[pos] = (UTF8COUNT0_W + lenbytes_n) + pos += 1 + for i in range(lenbytes_n): + buf[pos + lenbytes_n - 1 - i] = ((slen >> (i * 8)) & 0xff) + pos += lenbytes_n + memcpy(buf + pos, sptr, slen) + pos += slen + continue + + # ---- fallback: hand off to Python putValue ------------------ + # Commit the truncated size so session.putValue() sees the right + # current length, append via the Python path, then re-read. + if PyByteArray_Resize(out, pos) != 0: + raise MemoryError() + session.putValue(v) + pos = PyByteArray_GET_SIZE(out) + # Re-grow for the remaining rows / params in this row. + headroom = row_marker_len + parameter_count * 9 + if pos + headroom > cap: + cap = pos + headroom + max(4096, + (nrows - 1) * per_row_estimate) + if PyByteArray_Resize(out, cap) != 0: + raise MemoryError() + buf = PyByteArray_AS_STRING(out) + cap = PyByteArray_GET_SIZE(out) + + # Truncate to the actual final position. + if PyByteArray_Resize(out, pos) != 0: + raise MemoryError() + return nrows diff --git a/pynuodb/crypt.py b/pynuodb/crypt.py index 028f3a4..11cbb6e 100644 --- a/pynuodb/crypt.py +++ b/pynuodb/crypt.py @@ -142,50 +142,54 @@ def fromHex(hexStr): def toSignedByteString(value): # type: (int) -> bytearray - """Convert an integer into bytes.""" - result = bytearray() - if value == 0 or value == -1: - result.append(value & 0xFF) + """Convert an integer into the minimal big-endian two's-complement bytes. + + Uses int.to_bytes (C-level) instead of a Python byte-shift loop. For + negative values that are not exact -2**k the standard byte-length formula + (bit_length+8)//8 would over-allocate by one; the (-v-1).bit_length() + expression below computes the right minimal width without that case split. + """ + if value == 0: + return bytearray(b'\x00') + if value == -1: + return bytearray(b'\xff') + if value > 0: + nbytes = (value.bit_length() + 8) // 8 else: - while value != 0 and value != -1: - result.append(value & 0xFF) - value >>= 8 - # Zero pad if positive - if value == 0 and (result[-1] & 0x80) == 0x80: - result.append(0x00) - elif value == -1 and (result[-1] & 0x80) == 0x00: - result.append(0xFF) - result.reverse() - return result + nbytes = ((-value - 1).bit_length() + 8) // 8 + return bytearray(value.to_bytes(nbytes, 'big', signed=True)) def fromSignedByteString(data): # type: (bytearray) -> int """Convert bytes into a signed integer.""" - if data: - is_neg = (data[0] & 0x80) >> 7 - else: - is_neg = 0 - result = 0 - shiftCount = 0 - for b in reversed(data): - result = result | (((b & 0xFF) ^ (is_neg * 0xFF)) << shiftCount) - shiftCount += 8 - - return ((-1)**is_neg) * (result + is_neg) + return int.from_bytes(data, 'big', signed=True) def toByteString(bigInt): # type: (int) -> bytearray - """Convert an integer into bytes.""" + """Convert a non-negative integer into the minimal big-endian bytes. + + The legacy implementation also accepted negative inputs and produced a + quirky truncated two's-complement representation, but no current caller + invokes it that way (lengths, scales, message IDs, SRP primes are all + non-negative). We preserve the original behaviour for 0 and -1 (one byte + of 0x00 / 0xff) and fall back to the old algorithm for any other negative + value so external semantics stay byte-identical. + """ + if bigInt == 0: + return bytearray(b'\x00') + if bigInt == -1: + return bytearray(b'\xff') + if bigInt > 0: + nbytes = (bigInt.bit_length() + 7) // 8 + return bytearray(bigInt.to_bytes(nbytes, 'big')) + # Negative-other-than-(-1) fallback (unused in practice). result = bytearray() - if bigInt == -1 or bigInt == 0: + while bigInt != 0 and bigInt != -1: result.append(bigInt & 0xFF) - else: - while bigInt != 0 and bigInt != -1: - result.append(bigInt & 0xFF) - bigInt >>= 8 - result.reverse() + bigInt >>= 8 + result.reverse() return result diff --git a/pynuodb/cursor.py b/pynuodb/cursor.py index 4e515f5..f944518 100644 --- a/pynuodb/cursor.py +++ b/pynuodb/cursor.py @@ -184,7 +184,11 @@ def executemany(self, operation, seq_of_parameters): def fetchone(self): # type: () -> Optional[result_set.Row] """Return the next row of results from the previous SQL operation.""" - self._check_closed() + # Inline _check_closed to avoid per-row function-call overhead. + if self.closed: + raise Error("cursor is closed") + if self.session.closed: + raise Error("connection is closed") if self._result_set is None: raise Error("Previous execute did not produce any results or no call was issued yet") self.rownumber += 1 @@ -218,14 +222,22 @@ def fetchall(self): # type: () -> List[result_set.Row] """Return all rows generated by the previous SQL operation.""" self._check_closed() + if self._result_set is None: + raise Error("Previous execute did not produce any results or no call was issued yet") - fetched_rows = [] + fetched_rows = [] # type: List[result_set.Row] while True: - row = self.fetchone() - if row is None: + # Drain the current in-memory batch in one shot instead of calling + # fetchone() per row. + idx = self._result_set.results_idx + batch = self._result_set.results + if idx < len(batch): + fetched_rows.extend(batch[idx:]) + self._result_set.results_idx = len(batch) + if self._result_set.complete: break - else: - fetched_rows.append(row) + self.session.fetch_result_set_next(self._result_set) + self.rownumber += len(fetched_rows) return fetched_rows def nextset(self): # pylint: disable=no-self-use diff --git a/pynuodb/encodedsession.py b/pynuodb/encodedsession.py index d6bed99..4366891 100644 --- a/pynuodb/encodedsession.py +++ b/pynuodb/encodedsession.py @@ -37,6 +37,12 @@ from . import result_set from .datatype import LOCALZONE_NAME +try: + from . import _fetch as _fetch_accel + _HAVE_FETCH_ACCEL = True +except ImportError: + _HAVE_FETCH_ACCEL = False + # ZoneInfo is preferred but not introduced until 3.9 if sys.version_info >= (3, 9): # preferred python >= 3.9 @@ -465,16 +471,23 @@ def execute_batch_prepared_statement(self, prepared_statement, param_lists): """Batch the prepared statement with the given parameters.""" self._setup_statement(prepared_statement.handle, protocol.EXECUTEBATCHPREPAREDSTATEMENT) - for parameters in param_lists: - plen = len(parameters) - if prepared_statement.parameter_count != plen: - raise ProgrammingError("Incorrect number of parameters specified," - " expected %d, got %d" - % (prepared_statement.parameter_count, - plen)) - self.putInt(plen) - for param in parameters: - self.putValue(param) + expected = prepared_statement.parameter_count + if _HAVE_FETCH_ACCEL: + # encode_batch needs a list (it indexes / lens it). + if not isinstance(param_lists, list): + param_lists = list(param_lists) + _fetch_accel.encode_batch( + self.__output, param_lists, expected, self) + else: + for parameters in param_lists: + plen = len(parameters) + if expected != plen: + raise ProgrammingError("Incorrect number of parameters specified," + " expected %d, got %d" + % (expected, plen)) + self.putInt(plen) + for param in parameters: + self.putValue(param) self.putInt(-1) self.putInt(len(param_lists)) self._exchangeMessages() @@ -512,25 +525,46 @@ def fetch_result_set(self, stmt): for _ in range(colcount): self.getString() - complete = False init_results = [] # type: List[result_set.Row] - # If we hit the end of the stream without next==0, there are more - # results to fetch. - while self._hasBytes(1): - next_row = self.getInt() - if next_row == 0: - complete = True - break + if _HAVE_FETCH_ACCEL: + # The initial row batch has the same wire format as subsequent + # NEXT batches; reuse the Cython decoder rather than calling + # getValue() per cell through the Python path. + pos, complete = _fetch_accel.decode_next_batch( + self.__input, self.__inpos, colcount, + init_results, self._cython_exotic_decode, + self.timezone_info) + self.__inpos = pos + else: + complete = False + # If we hit the end of the stream without next==0, there are more + # results to fetch. + while self._hasBytes(1): + next_row = self.getInt() + if next_row == 0: + complete = True + break - row = [None] * colcount - for i in range(colcount): - row[i] = self.getValue() + row = [None] * colcount + for i in range(colcount): + row[i] = self.getValue() - init_results.append(tuple(row)) + init_results.append(tuple(row)) return result_set.ResultSet(handle, colcount, init_results, complete) + def _cython_exotic_decode(self, pos): + # type: (int) -> tuple + """Bridge called by _fetch_accel.decode_next_batch for exotic wire types. + + Sets __inpos to pos, calls getValue() (which handles any NuoDB type), + then returns (value, new_pos) so the Cython loop can resume. + """ + self.__inpos = pos + val = self.getValue() + return val, self.__inpos + def fetch_result_set_next(self, resultset): # type: (result_set.ResultSet) -> None """Get more rows from this result set.""" @@ -539,6 +573,15 @@ def fetch_result_set_next(self, resultset): resultset.clear_results() + if _HAVE_FETCH_ACCEL: + pos, complete = _fetch_accel.decode_next_batch( + self.__input, self.__inpos, resultset.col_count, + resultset.results, self._cython_exotic_decode, + self.timezone_info) + self.__inpos = pos + resultset.complete = complete + return + while self._hasBytes(1): if self.getInt() == 0: resultset.complete = True @@ -869,6 +912,24 @@ def putValue(self, value): # pylint: disable=too-many-return-statements if value is None: return self.putNull() + # Fast paths: `type(v) is X` is a C-level pointer compare; isinstance + # walks the MRO and is markedly slower. These hit on the bulk of + # bound parameters (plain int / str / float). + tv = type(value) + if tv is int: + return self.putInt(value) + if tv is str: + return self.putString(value) + if tv is float: + return self.putDouble(value) + if tv is bool: + # Preserve historic wire behaviour: bools encode as integers + # because the original isinstance(value, int) chain matched True + # and False before reaching the (dead) bool branch below. + return self.putInt(value) + + # Subclass-aware fallback for the long tail (int/str subclasses, + # Decimal, datetime types, Binary, Vector, etc.). if isinstance(value, int): return self.putInt(value) @@ -891,9 +952,6 @@ def putValue(self, value): # pylint: disable=too-many-return-statements if isinstance(value, datatype.Binary): return self.putOpaque(value) - if isinstance(value, bool): - return self.putBoolean(value) - # we don't want to autodetect lists as being VECTOR, so we # only bind double if it is the explicit type if isinstance(value, datatype.Vector): @@ -919,6 +977,7 @@ def getInt(self): raise DataError('Not an integer: %d' % (code)) + # Does not preserve E notation def getScaledInt(self): # type: () -> decimal.Decimal @@ -1311,7 +1370,8 @@ def _exchangeMessages(self, getResponse=True): resp = self.recv(timeout=None) if resp is None: db_error_handler(protocol.OPERATION_TIMEOUT, "timed out") - self.__input = crypt.bytesToArray(resp) + # recv() now returns bytearray directly; no copy needed. + self.__input = resp error = self.getInt() if error != 0: diff --git a/pynuodb/result_set.py b/pynuodb/result_set.py index 2cb148a..d0eb23e 100644 --- a/pynuodb/result_set.py +++ b/pynuodb/result_set.py @@ -60,3 +60,12 @@ def fetchone(self): res = self.results[self.results_idx] self.results_idx += 1 return res + + +# Replace the Python implementation above with the Cython cdef class when the +# extension has been built. The interface is identical; fetchone() and +# is_complete() become near-C-speed cpdef calls. +try: + from ._fetch import ResultSet # noqa: F811 pylint: disable=unused-import +except ImportError: + pass diff --git a/pynuodb/session.py b/pynuodb/session.py index 9af87d6..a3a7868 100644 --- a/pynuodb/session.py +++ b/pynuodb/session.py @@ -229,6 +229,15 @@ def _open_socket(self, connect_timeout, host, port, af, read_timeout): self.__sock = socket.socket(af, socket.SOCK_STREAM) # disable Nagle's algorithm self.__sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # Bigger receive buffer cuts recv_into syscalls during full-table + # scans (each batch is ~140 KB; default SO_RCVBUF is ~64 KB which + # forces 2-3 reads per batch). Kernel clamps to its sysctl maximum + # so the request is opportunistic. + try: + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + 1 << 20) # 1 MiB + except OSError: + pass # separate connect and read timeout; we do not necessarily want to # close out connection if reads block for a long time, because it could # take a while for the server to generate data to send @@ -481,12 +490,13 @@ def send(self, message): raise def recv(self, timeout=None): - # type: (Optional[float]) -> Optional[bytes] + # type: (Optional[float]) -> Optional[bytearray] """Pull the next message from the socket. If timeout is None, wait forever (until read_timeout, if set). If timeout is a float, then set this timeout for this recv(). On timeout, return None but do not close the connection. + Returns a bytearray to avoid an extra copy in the caller. """ try: # We only wait on timeout to read the header. Once we read @@ -506,17 +516,30 @@ def recv(self, timeout=None): raise RuntimeError("Session.recv read no data!") if self.__cipherIn: - msg = self.__cipherIn.transform(msg) + # cryptography.Cipher.update accepts any buffer-like object, so + # pass the bytearray directly instead of copying via bytes(msg). + # Result is wrapped back into bytearray so callers always receive + # a bytearray regardless of cipher state. + return bytearray(self.__cipherIn.transform(msg)) return msg def __readFully(self, msgLength, timeout=None): - # type: (int, Optional[float]) -> Optional[bytes] - """Pull the next complete message from the socket.""" + # type: (int, Optional[float]) -> Optional[bytearray] + """Pull the next complete message from the socket. + + Pre-allocates a bytearray of the exact required size and fills it with + recv_into(), avoiding the repeated bytearray concatenations and the + final bytes() copy that the previous implementation performed. + """ + if msgLength == 0: + return bytearray() sock = self._sock - msg = bytearray() + msg = bytearray(msgLength) + mv = memoryview(msg) old_tmout = sock.gettimeout() - while msgLength > 0: + offset = 0 + while offset < msgLength: if timeout is not None: # It's a little wrong that this timeout applies to each recv() # instead of to the entire operation; however we only use this @@ -524,7 +547,7 @@ def __readFully(self, msgLength, timeout=None): # pass anyway. sock.settimeout(timeout) try: - received = sock.recv(msgLength) + received = sock.recv_into(mv[offset:], msgLength - offset) except socket.timeout: return None except IOError as e: @@ -539,11 +562,10 @@ def __readFully(self, msgLength, timeout=None): raise SessionException( "Session closed waiting for data: wanted length=%d," " received length=%d" - % (msgLength, len(msg))) - msg += received - msgLength -= len(received) + % (msgLength, offset)) + offset += received - return bytes(msg) + return msg def stream_recv(self, blocksz=4096, timeout=None): # type: (int, Optional[float]) -> Generator[bytes, None, None] diff --git a/setup.py b/setup.py index 8b2d0e9..cf3eda2 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,16 @@ from setuptools import setup +try: + from Cython.Build import cythonize + from setuptools import Extension + _ext_modules = cythonize( + Extension("pynuodb._fetch", ["pynuodb/_fetch.pyx"]), + compiler_directives={"language_level": "3"}, + ) +except ImportError: + _ext_modules = [] + with open(os.path.join(os.path.dirname(__file__), 'pynuodb', '__init__.py')) as v: m = re.search(r"^ *__version__ *= *'(.*?)'", v.read(), re.M) if m is None: @@ -40,6 +50,7 @@ description='NuoDB Python driver', keywords='nuodb scalable cloud database', packages=['pynuodb'], + ext_modules=_ext_modules, url='https://github.com/nuodb/nuodb-python', license='BSD License', long_description=open(readme).read(), diff --git a/test-performance/timesInsert.py b/test-performance/timesInsert.py index 9ee32bd..7428221 100644 --- a/test-performance/timesInsert.py +++ b/test-performance/timesInsert.py @@ -14,8 +14,8 @@ def gettime(): def insert(count): - for i in range(count): - cursor.execute("INSERT INTO perf_test (a,b ) VALUES (%d,'A')" % i) + cursor.executemany("INSERT INTO perf_test (a,b) VALUES (?, ?)", + [(i, 'A') for i in range(count)]) connection.commit()