diff --git a/riak/transports/pbc/connection.py b/riak/transports/pbc/connection.py index 12045323..79c6b7a5 100644 --- a/riak/transports/pbc/connection.py +++ b/riak/transports/pbc/connection.py @@ -204,6 +204,12 @@ def _connect(self): self._timeout) else: self._socket = socket.create_connection(self._address) + if self._socket_keepalive: + self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + ka_opts = self._socket_keepalive_options or {} + for k, v in ka_opts.iteritems(): + self._socket.setsockopt(socket.SOL_TCP, k, v) + if self._client._credentials: self._init_security() @@ -231,3 +237,5 @@ def _parse_msg(self, code, packet): # These are set in the RiakPbcTransport initializer _address = None _timeout = None + _socket_keepalive = None + _socket_keepalive_options = None diff --git a/riak/transports/pbc/transport.py b/riak/transports/pbc/transport.py index 53df2181..6f7063c0 100644 --- a/riak/transports/pbc/transport.py +++ b/riak/transports/pbc/transport.py @@ -28,6 +28,8 @@ def __init__(self, node=None, client=None, timeout=None, + socket_keepalive=False, + socket_keepalive_options=None, *unused_options): """ Construct a new RiakPbcTransport object. @@ -39,6 +41,8 @@ def __init__(self, self._address = (node.host, node.pb_port) self._timeout = timeout self._socket = None + self._socket_keepalive = socket_keepalive + self._socket_keepalive_options = socket_keepalive_options # FeatureDetection API def _server_version(self):