diff --git a/pynuodb/encodedsession.py b/pynuodb/encodedsession.py index d6bed99..e3b3ae1 100644 --- a/pynuodb/encodedsession.py +++ b/pynuodb/encodedsession.py @@ -465,13 +465,13 @@ def execute_batch_prepared_statement(self, prepared_statement, param_lists): """Batch the prepared statement with the given parameters.""" self._setup_statement(prepared_statement.handle, protocol.EXECUTEBATCHPREPAREDSTATEMENT) + expected = prepared_statement.parameter_count for parameters in param_lists: plen = len(parameters) - if prepared_statement.parameter_count != plen: + if expected != plen: raise ProgrammingError("Incorrect number of parameters specified," " expected %d, got %d" - % (prepared_statement.parameter_count, - plen)) + % (expected, plen)) self.putInt(plen) for param in parameters: self.putValue(param) @@ -869,6 +869,24 @@ def putValue(self, value): # pylint: disable=too-many-return-statements if value is None: return self.putNull() + # Fast paths: `type(v) is X` is a C-level pointer compare; isinstance + # walks the MRO and is markedly slower. These hit on the bulk of + # bound parameters (plain int / str / float). + tv = type(value) + if tv is int: + return self.putInt(value) + if tv is str: + return self.putString(value) + if tv is float: + return self.putDouble(value) + if tv is bool: + # Preserve historic wire behaviour: bools encode as integers + # because the original isinstance(value, int) chain matched True + # and False before reaching the (dead) bool branch below. + return self.putInt(value) + + # Subclass-aware fallback for the long tail (int/str subclasses, + # Decimal, datetime types, Binary, Vector, etc.). if isinstance(value, int): return self.putInt(value) @@ -891,9 +909,6 @@ def putValue(self, value): # pylint: disable=too-many-return-statements if isinstance(value, datatype.Binary): return self.putOpaque(value) - if isinstance(value, bool): - return self.putBoolean(value) - # we don't want to autodetect lists as being VECTOR, so we # only bind double if it is the explicit type if isinstance(value, datatype.Vector): @@ -919,6 +934,7 @@ def getInt(self): raise DataError('Not an integer: %d' % (code)) + # Does not preserve E notation def getScaledInt(self): # type: () -> decimal.Decimal @@ -1311,7 +1327,8 @@ def _exchangeMessages(self, getResponse=True): resp = self.recv(timeout=None) if resp is None: db_error_handler(protocol.OPERATION_TIMEOUT, "timed out") - self.__input = crypt.bytesToArray(resp) + # recv() now returns bytearray directly; no copy needed. + self.__input = resp error = self.getInt() if error != 0: diff --git a/pynuodb/session.py b/pynuodb/session.py index 9af87d6..a3a7868 100644 --- a/pynuodb/session.py +++ b/pynuodb/session.py @@ -229,6 +229,15 @@ def _open_socket(self, connect_timeout, host, port, af, read_timeout): self.__sock = socket.socket(af, socket.SOCK_STREAM) # disable Nagle's algorithm self.__sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # Bigger receive buffer cuts recv_into syscalls during full-table + # scans (each batch is ~140 KB; default SO_RCVBUF is ~64 KB which + # forces 2-3 reads per batch). Kernel clamps to its sysctl maximum + # so the request is opportunistic. + try: + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + 1 << 20) # 1 MiB + except OSError: + pass # separate connect and read timeout; we do not necessarily want to # close out connection if reads block for a long time, because it could # take a while for the server to generate data to send @@ -481,12 +490,13 @@ def send(self, message): raise def recv(self, timeout=None): - # type: (Optional[float]) -> Optional[bytes] + # type: (Optional[float]) -> Optional[bytearray] """Pull the next message from the socket. If timeout is None, wait forever (until read_timeout, if set). If timeout is a float, then set this timeout for this recv(). On timeout, return None but do not close the connection. + Returns a bytearray to avoid an extra copy in the caller. """ try: # We only wait on timeout to read the header. Once we read @@ -506,17 +516,30 @@ def recv(self, timeout=None): raise RuntimeError("Session.recv read no data!") if self.__cipherIn: - msg = self.__cipherIn.transform(msg) + # cryptography.Cipher.update accepts any buffer-like object, so + # pass the bytearray directly instead of copying via bytes(msg). + # Result is wrapped back into bytearray so callers always receive + # a bytearray regardless of cipher state. + return bytearray(self.__cipherIn.transform(msg)) return msg def __readFully(self, msgLength, timeout=None): - # type: (int, Optional[float]) -> Optional[bytes] - """Pull the next complete message from the socket.""" + # type: (int, Optional[float]) -> Optional[bytearray] + """Pull the next complete message from the socket. + + Pre-allocates a bytearray of the exact required size and fills it with + recv_into(), avoiding the repeated bytearray concatenations and the + final bytes() copy that the previous implementation performed. + """ + if msgLength == 0: + return bytearray() sock = self._sock - msg = bytearray() + msg = bytearray(msgLength) + mv = memoryview(msg) old_tmout = sock.gettimeout() - while msgLength > 0: + offset = 0 + while offset < msgLength: if timeout is not None: # It's a little wrong that this timeout applies to each recv() # instead of to the entire operation; however we only use this @@ -524,7 +547,7 @@ def __readFully(self, msgLength, timeout=None): # pass anyway. sock.settimeout(timeout) try: - received = sock.recv(msgLength) + received = sock.recv_into(mv[offset:], msgLength - offset) except socket.timeout: return None except IOError as e: @@ -539,11 +562,10 @@ def __readFully(self, msgLength, timeout=None): raise SessionException( "Session closed waiting for data: wanted length=%d," " received length=%d" - % (msgLength, len(msg))) - msg += received - msgLength -= len(received) + % (msgLength, offset)) + offset += received - return bytes(msg) + return msg def stream_recv(self, blocksz=4096, timeout=None): # type: (int, Optional[float]) -> Generator[bytes, None, None]