Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions pynuodb/encodedsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,13 +465,13 @@ def execute_batch_prepared_statement(self, prepared_statement, param_lists):
"""Batch the prepared statement with the given parameters."""
self._setup_statement(prepared_statement.handle, protocol.EXECUTEBATCHPREPAREDSTATEMENT)

expected = prepared_statement.parameter_count
for parameters in param_lists:
plen = len(parameters)
if prepared_statement.parameter_count != plen:
if expected != plen:
raise ProgrammingError("Incorrect number of parameters specified,"
" expected %d, got %d"
% (prepared_statement.parameter_count,
plen))
% (expected, plen))
self.putInt(plen)
for param in parameters:
self.putValue(param)
Expand Down Expand Up @@ -869,6 +869,24 @@ def putValue(self, value): # pylint: disable=too-many-return-statements
if value is None:
return self.putNull()

# Fast paths: `type(v) is X` is a C-level pointer compare; isinstance
# walks the MRO and is markedly slower. These hit on the bulk of
# bound parameters (plain int / str / float).
tv = type(value)
if tv is int:
return self.putInt(value)
if tv is str:
return self.putString(value)
if tv is float:
return self.putDouble(value)
if tv is bool:
# Preserve historic wire behaviour: bools encode as integers
# because the original isinstance(value, int) chain matched True
# and False before reaching the (dead) bool branch below.
return self.putInt(value)

# Subclass-aware fallback for the long tail (int/str subclasses,
# Decimal, datetime types, Binary, Vector, etc.).
if isinstance(value, int):
return self.putInt(value)

Expand All @@ -891,9 +909,6 @@ def putValue(self, value): # pylint: disable=too-many-return-statements
if isinstance(value, datatype.Binary):
return self.putOpaque(value)

if isinstance(value, bool):
return self.putBoolean(value)

# we don't want to autodetect lists as being VECTOR, so we
# only bind double if it is the explicit type
if isinstance(value, datatype.Vector):
Expand All @@ -919,6 +934,7 @@ def getInt(self):

raise DataError('Not an integer: %d' % (code))


# Does not preserve E notation
def getScaledInt(self):
# type: () -> decimal.Decimal
Expand Down Expand Up @@ -1311,7 +1327,8 @@ def _exchangeMessages(self, getResponse=True):
resp = self.recv(timeout=None)
if resp is None:
db_error_handler(protocol.OPERATION_TIMEOUT, "timed out")
self.__input = crypt.bytesToArray(resp)
# recv() now returns bytearray directly; no copy needed.
self.__input = resp

error = self.getInt()
if error != 0:
Expand Down
44 changes: 33 additions & 11 deletions pynuodb/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ def _open_socket(self, connect_timeout, host, port, af, read_timeout):
self.__sock = socket.socket(af, socket.SOCK_STREAM)
# disable Nagle's algorithm
self.__sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Bigger receive buffer cuts recv_into syscalls during full-table
# scans (each batch is ~140 KB; default SO_RCVBUF is ~64 KB which
# forces 2-3 reads per batch). Kernel clamps to its sysctl maximum
# so the request is opportunistic.
try:
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
1 << 20) # 1 MiB
except OSError:
pass
# separate connect and read timeout; we do not necessarily want to
# close out connection if reads block for a long time, because it could
# take a while for the server to generate data to send
Expand Down Expand Up @@ -481,12 +490,13 @@ def send(self, message):
raise

def recv(self, timeout=None):
# type: (Optional[float]) -> Optional[bytes]
# type: (Optional[float]) -> Optional[bytearray]
"""Pull the next message from the socket.

If timeout is None, wait forever (until read_timeout, if set).
If timeout is a float, then set this timeout for this recv().
On timeout, return None but do not close the connection.
Returns a bytearray to avoid an extra copy in the caller.
"""
try:
# We only wait on timeout to read the header. Once we read
Expand All @@ -506,25 +516,38 @@ def recv(self, timeout=None):
raise RuntimeError("Session.recv read no data!")

if self.__cipherIn:
msg = self.__cipherIn.transform(msg)
# cryptography.Cipher.update accepts any buffer-like object, so
# pass the bytearray directly instead of copying via bytes(msg).
# Result is wrapped back into bytearray so callers always receive
# a bytearray regardless of cipher state.
return bytearray(self.__cipherIn.transform(msg))

return msg

def __readFully(self, msgLength, timeout=None):
# type: (int, Optional[float]) -> Optional[bytes]
"""Pull the next complete message from the socket."""
# type: (int, Optional[float]) -> Optional[bytearray]
"""Pull the next complete message from the socket.

Pre-allocates a bytearray of the exact required size and fills it with
recv_into(), avoiding the repeated bytearray concatenations and the
final bytes() copy that the previous implementation performed.
"""
if msgLength == 0:
return bytearray()
sock = self._sock
msg = bytearray()
msg = bytearray(msgLength)
mv = memoryview(msg)
old_tmout = sock.gettimeout()
while msgLength > 0:
offset = 0
while offset < msgLength:
if timeout is not None:
# It's a little wrong that this timeout applies to each recv()
# instead of to the entire operation; however we only use this
# when reading the header which will always be read in one
# pass anyway.
sock.settimeout(timeout)
try:
received = sock.recv(msgLength)
received = sock.recv_into(mv[offset:], msgLength - offset)
except socket.timeout:
return None
except IOError as e:
Expand All @@ -539,11 +562,10 @@ def __readFully(self, msgLength, timeout=None):
raise SessionException(
"Session closed waiting for data: wanted length=%d,"
" received length=%d"
% (msgLength, len(msg)))
msg += received
msgLength -= len(received)
% (msgLength, offset))
offset += received

return bytes(msg)
return msg

def stream_recv(self, blocksz=4096, timeout=None):
# type: (int, Optional[float]) -> Generator[bytes, None, None]
Expand Down