From f4e111e31567de4ea3d7351c45da34b1b97669f4 Mon Sep 17 00:00:00 2001 From: Patrick Ellul Date: Fri, 11 Dec 2015 14:31:54 +1100 Subject: [PATCH 1/8] adding keepalive options to pbc transport connection socket --- riak/transports/pbc/connection.py | 8 ++++++++ riak/transports/pbc/transport.py | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/riak/transports/pbc/connection.py b/riak/transports/pbc/connection.py index 0bc58232..4d15387b 100644 --- a/riak/transports/pbc/connection.py +++ b/riak/transports/pbc/connection.py @@ -220,6 +220,12 @@ def _connect(self): self._timeout) else: self._socket = socket.create_connection(self._address) + if self._socket_keepalive: + self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + ka_opts = self._socket_keepalive_options or {} + for k, v in ka_opts.iteritems(): + self._socket.setsockopt(socket.SOL_TCP, k, v) + if self._client._credentials: self._init_security() @@ -247,3 +253,5 @@ def _parse_msg(self, code, packet): # These are set in the RiakPbcTransport initializer _address = None _timeout = None + _socket_keepalive = None + _socket_keepalive_options = None diff --git a/riak/transports/pbc/transport.py b/riak/transports/pbc/transport.py index e385c698..14ef2f75 100644 --- a/riak/transports/pbc/transport.py +++ b/riak/transports/pbc/transport.py @@ -93,6 +93,8 @@ def __init__(self, node=None, client=None, timeout=None, + socket_keepalive=False, + socket_keepalive_options=None, *unused_options): """ Construct a new RiakPbcTransport object. @@ -104,6 +106,8 @@ def __init__(self, self._address = (node.host, node.pb_port) self._timeout = timeout self._socket = None + self._socket_keepalive = socket_keepalive + self._socket_keepalive_options = socket_keepalive_options # FeatureDetection API def _server_version(self): From 6e2969a9770b0d347bae7cdcbefdc1893df3d43f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 27 Apr 2016 08:51:37 -0700 Subject: [PATCH 2/8] remove file --- riak/transports/pbc/transport.py | 774 ------------------------------- 1 file changed, 774 deletions(-) delete mode 100644 riak/transports/pbc/transport.py diff --git a/riak/transports/pbc/transport.py b/riak/transports/pbc/transport.py deleted file mode 100644 index 6f7063c0..00000000 --- a/riak/transports/pbc/transport.py +++ /dev/null @@ -1,774 +0,0 @@ -import riak.pb.messages -import riak.pb.riak_pb2 -import riak.pb.riak_kv_pb2 -import riak.pb.riak_ts_pb2 - -from riak import RiakError -from riak.transports.transport import RiakTransport -from riak.riak_object import VClock -from riak.ts_object import TsObject -from riak.util import decode_index_value, str_to_bytes, bytes_to_str -from riak.transports.pbc.connection import RiakPbcConnection -from riak.transports.pbc.stream import (RiakPbcKeyStream, - RiakPbcMapredStream, - RiakPbcBucketStream, - RiakPbcIndexStream, - RiakPbcTsKeyStream) -from riak.transports.pbc.codec import RiakPbcCodec -from six import PY2, PY3 - - -class RiakPbcTransport(RiakTransport, RiakPbcConnection, RiakPbcCodec): - """ - The RiakPbcTransport object holds a connection to the protocol - buffers interface on the riak server. - """ - - def __init__(self, - node=None, - client=None, - timeout=None, - socket_keepalive=False, - socket_keepalive_options=None, - *unused_options): - """ - Construct a new RiakPbcTransport object. - """ - super(RiakPbcTransport, self).__init__() - - self._client = client - self._node = node - self._address = (node.host, node.pb_port) - self._timeout = timeout - self._socket = None - self._socket_keepalive = socket_keepalive - self._socket_keepalive_options = socket_keepalive_options - - # FeatureDetection API - def _server_version(self): - return bytes_to_str(self.get_server_info()['server_version']) - - def ping(self): - """ - Ping the remote server - """ - - msg_code, msg = self._request(riak.pb.messages.MSG_CODE_PING_REQ) - if msg_code == riak.pb.messages.MSG_CODE_PING_RESP: - return True - else: - return False - - def get_server_info(self): - """ - Get information about the server - """ - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_GET_SERVER_INFO_REQ, - expect=riak.pb.messages.MSG_CODE_GET_SERVER_INFO_RESP) - return {'node': bytes_to_str(resp.node), - 'server_version': bytes_to_str(resp.server_version)} - - def _get_client_id(self): - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_GET_CLIENT_ID_REQ, - expect=riak.pb.messages.MSG_CODE_GET_CLIENT_ID_RESP) - return bytes_to_str(resp.client_id) - - def _set_client_id(self, client_id): - req = riak.pb.riak_kv_pb2.RpbSetClientIdReq() - req.client_id = str_to_bytes(client_id) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_SET_CLIENT_ID_REQ, req, - riak.pb.messages.MSG_CODE_SET_CLIENT_ID_RESP) - - self._client_id = client_id - - client_id = property(_get_client_id, _set_client_id, - doc="""the client ID for this connection""") - - def get(self, robj, r=None, pr=None, timeout=None, basic_quorum=None, - notfound_ok=None): - """ - Serialize get request and deserialize response - """ - bucket = robj.bucket - - req = riak.pb.riak_kv_pb2.RpbGetReq() - if r: - req.r = self._encode_quorum(r) - if self.quorum_controls(): - if pr: - req.pr = self._encode_quorum(pr) - if basic_quorum is not None: - req.basic_quorum = basic_quorum - if notfound_ok is not None: - req.notfound_ok = notfound_ok - if self.client_timeouts() and timeout: - req.timeout = timeout - if self.tombstone_vclocks(): - req.deletedvclock = True - - req.bucket = str_to_bytes(bucket.name) - self._add_bucket_type(req, bucket.bucket_type) - - req.key = str_to_bytes(robj.key) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_GET_REQ, req, - riak.pb.messages.MSG_CODE_GET_RESP) - - if resp is not None: - if resp.HasField('vclock'): - robj.vclock = VClock(resp.vclock, 'binary') - # We should do this even if there are no contents, i.e. - # the object is tombstoned - self._decode_contents(resp.content, robj) - else: - # "not found" returns an empty message, - # so let's make sure to clear the siblings - robj.siblings = [] - - return robj - - def put(self, robj, w=None, dw=None, pw=None, return_body=True, - if_none_match=False, timeout=None): - bucket = robj.bucket - - req = riak.pb.riak_kv_pb2.RpbPutReq() - if w: - req.w = self._encode_quorum(w) - if dw: - req.dw = self._encode_quorum(dw) - if self.quorum_controls() and pw: - req.pw = self._encode_quorum(pw) - - if return_body: - req.return_body = 1 - if if_none_match: - req.if_none_match = 1 - if self.client_timeouts() and timeout: - req.timeout = timeout - - req.bucket = str_to_bytes(bucket.name) - self._add_bucket_type(req, bucket.bucket_type) - - if robj.key: - req.key = str_to_bytes(robj.key) - if robj.vclock: - req.vclock = robj.vclock.encode('binary') - - self._encode_content(robj, req.content) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_PUT_REQ, req, - riak.pb.messages.MSG_CODE_PUT_RESP) - - if resp is not None: - if resp.HasField('key'): - robj.key = bytes_to_str(resp.key) - if resp.HasField("vclock"): - robj.vclock = VClock(resp.vclock, 'binary') - if resp.content: - self._decode_contents(resp.content, robj) - elif not robj.key: - raise RiakError("missing response object") - - return robj - - def ts_describe(self, table): - query = 'DESCRIBE {table}'.format(table=table.name) - return self.ts_query(table, query) - - def ts_get(self, table, key): - req = riak.pb.riak_ts_pb2.TsGetReq() - self._encode_timeseries_keyreq(table, key, req) - - msg_code, ts_get_resp = self._request( - riak.pb.messages.MSG_CODE_TS_GET_REQ, req, - riak.pb.messages.MSG_CODE_TS_GET_RESP) - - tsobj = TsObject(self._client, table, [], None) - self._decode_timeseries(ts_get_resp, tsobj) - return tsobj - - def ts_put(self, tsobj): - req = riak.pb.riak_ts_pb2.TsPutReq() - self._encode_timeseries_put(tsobj, req) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_TS_PUT_REQ, req, - riak.pb.messages.MSG_CODE_TS_PUT_RESP) - - if resp is not None: - return True - else: - raise RiakError("missing response object") - - def ts_delete(self, table, key): - req = riak.pb.riak_ts_pb2.TsDelReq() - self._encode_timeseries_keyreq(table, key, req) - - msg_code, ts_del_resp = self._request( - riak.pb.messages.MSG_CODE_TS_DEL_REQ, req, - riak.pb.messages.MSG_CODE_TS_DEL_RESP) - - if ts_del_resp is not None: - return True - else: - raise RiakError("missing response object") - - def ts_query(self, table, query, interpolations=None): - req = riak.pb.riak_ts_pb2.TsQueryReq() - - q = query - if '{table}' in q: - q = q.format(table=table.name) - - req.query.base = str_to_bytes(q) - - msg_code, ts_query_resp = self._request( - riak.pb.messages.MSG_CODE_TS_QUERY_REQ, req, - riak.pb.messages.MSG_CODE_TS_QUERY_RESP) - - tsobj = TsObject(self._client, table, [], []) - self._decode_timeseries(ts_query_resp, tsobj) - return tsobj - - def ts_stream_keys(self, table, timeout=None): - """ - Streams keys from a timeseries table, returning an iterator that - yields lists of keys. - """ - req = riak.pb.riak_ts_pb2.TsListKeysReq() - t = None - if self.client_timeouts() and timeout: - t = timeout - self._encode_timeseries_listkeysreq(table, req, t) - - self._send_msg(riak.pb.messages.MSG_CODE_TS_LIST_KEYS_REQ, req) - - return RiakPbcTsKeyStream(self) - - def delete(self, robj, rw=None, r=None, w=None, dw=None, pr=None, pw=None, - timeout=None): - req = riak.pb.riak_kv_pb2.RpbDelReq() - if rw: - req.rw = self._encode_quorum(rw) - if r: - req.r = self._encode_quorum(r) - if w: - req.w = self._encode_quorum(w) - if dw: - req.dw = self._encode_quorum(dw) - - if self.quorum_controls(): - if pr: - req.pr = self._encode_quorum(pr) - if pw: - req.pw = self._encode_quorum(pw) - - if self.client_timeouts() and timeout: - req.timeout = timeout - - use_vclocks = (self.tombstone_vclocks() and - hasattr(robj, 'vclock') and robj.vclock) - if use_vclocks: - req.vclock = robj.vclock.encode('binary') - - bucket = robj.bucket - req.bucket = str_to_bytes(bucket.name) - self._add_bucket_type(req, bucket.bucket_type) - req.key = str_to_bytes(robj.key) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_DEL_REQ, req, - riak.pb.messages.MSG_CODE_DEL_RESP) - return self - - def get_keys(self, bucket, timeout=None): - """ - Lists all keys within a bucket. - """ - keys = [] - for keylist in self.stream_keys(bucket, timeout=timeout): - for key in keylist: - keys.append(bytes_to_str(key)) - - return keys - - def stream_keys(self, bucket, timeout=None): - """ - Streams keys from a bucket, returning an iterator that yields - lists of keys. - """ - req = riak.pb.riak_kv_pb2.RpbListKeysReq() - req.bucket = str_to_bytes(bucket.name) - self._add_bucket_type(req, bucket.bucket_type) - if self.client_timeouts() and timeout: - req.timeout = timeout - - self._send_msg(riak.pb.messages.MSG_CODE_LIST_KEYS_REQ, req) - - return RiakPbcKeyStream(self) - - def get_buckets(self, bucket_type=None, timeout=None): - """ - Serialize bucket listing request and deserialize response - """ - req = riak.pb.riak_kv_pb2.RpbListBucketsReq() - self._add_bucket_type(req, bucket_type) - - if self.client_timeouts() and timeout: - req.timeout = timeout - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_LIST_BUCKETS_REQ, req, - riak.pb.messages.MSG_CODE_LIST_BUCKETS_RESP) - return resp.buckets - - def stream_buckets(self, bucket_type=None, timeout=None): - """ - Stream list of buckets through an iterator - """ - - if not self.bucket_stream(): - raise NotImplementedError('Streaming list-buckets is not ' - 'supported') - - req = riak.pb.riak_kv_pb2.RpbListBucketsReq() - req.stream = True - self._add_bucket_type(req, bucket_type) - # Bucket streaming landed in the same release as timeouts, so - # we don't need to check the capability. - if timeout: - req.timeout = timeout - - self._send_msg(riak.pb.messages.MSG_CODE_LIST_BUCKETS_REQ, req) - - return RiakPbcBucketStream(self) - - def get_bucket_props(self, bucket): - """ - Serialize bucket property request and deserialize response - """ - req = riak.pb.riak_pb2.RpbGetBucketReq() - req.bucket = str_to_bytes(bucket.name) - self._add_bucket_type(req, bucket.bucket_type) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_GET_BUCKET_REQ, req, - riak.pb.messages.MSG_CODE_GET_BUCKET_RESP) - - return self._decode_bucket_props(resp.props) - - def set_bucket_props(self, bucket, props): - """ - Serialize set bucket property request and deserialize response - """ - req = riak.pb.riak_pb2.RpbSetBucketReq() - req.bucket = str_to_bytes(bucket.name) - self._add_bucket_type(req, bucket.bucket_type) - - if not self.pb_all_bucket_props(): - for key in props: - if key not in ('n_val', 'allow_mult'): - raise NotImplementedError('Server only supports n_val and ' - 'allow_mult properties over PBC') - - self._encode_bucket_props(props, req) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_SET_BUCKET_REQ, req, - riak.pb.messages.MSG_CODE_SET_BUCKET_RESP) - return True - - def clear_bucket_props(self, bucket): - """ - Clear bucket properties, resetting them to their defaults - """ - if not self.pb_clear_bucket_props(): - return False - - req = riak.pb.riak_pb2.RpbResetBucketReq() - req.bucket = str_to_bytes(bucket.name) - self._add_bucket_type(req, bucket.bucket_type) - self._request( - riak.pb.messages.MSG_CODE_RESET_BUCKET_REQ, req, - riak.pb.messages.MSG_CODE_RESET_BUCKET_RESP) - return True - - def get_bucket_type_props(self, bucket_type): - """ - Fetch bucket-type properties - """ - self._check_bucket_types(bucket_type) - - req = riak.pb.riak_pb2.RpbGetBucketTypeReq() - req.type = str_to_bytes(bucket_type.name) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_GET_BUCKET_TYPE_REQ, req, - riak.pb.messages.MSG_CODE_GET_BUCKET_RESP) - - return self._decode_bucket_props(resp.props) - - def set_bucket_type_props(self, bucket_type, props): - """ - Set bucket-type properties - """ - self._check_bucket_types(bucket_type) - - req = riak.pb.riak_pb2.RpbSetBucketTypeReq() - req.type = str_to_bytes(bucket_type.name) - - self._encode_bucket_props(props, req) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_SET_BUCKET_TYPE_REQ, req, - riak.pb.messages.MSG_CODE_SET_BUCKET_RESP) - - return True - - def mapred(self, inputs, query, timeout=None): - # dictionary of phase results - each content should be an encoded array - # which is appended to the result for that phase. - result = {} - for phase, content in self.stream_mapred(inputs, query, timeout): - if phase in result: - result[phase] += content - else: - result[phase] = content - - # If a single result - return the same as the HTTP interface does - # otherwise return all the phase information - if not len(result): - return None - elif len(result) == 1: - return result[max(result.keys())] - else: - return result - - def stream_mapred(self, inputs, query, timeout=None): - # Construct the job, optionally set the timeout... - content = self._construct_mapred_json(inputs, query, timeout) - - req = riak.pb.riak_kv_pb2.RpbMapRedReq() - req.request = str_to_bytes(content) - req.content_type = str_to_bytes("application/json") - - self._send_msg(riak.pb.messages.MSG_CODE_MAP_RED_REQ, req) - - return RiakPbcMapredStream(self) - - def get_index(self, bucket, index, startkey, endkey=None, - return_terms=None, max_results=None, continuation=None, - timeout=None, term_regex=None): - if not self.pb_indexes(): - return self._get_index_mapred_emu(bucket, index, startkey, endkey) - - if term_regex and not self.index_term_regex(): - raise NotImplementedError("Secondary index term_regex is not " - "supported") - - req = self._encode_index_req(bucket, index, startkey, endkey, - return_terms, max_results, continuation, - timeout, term_regex) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_INDEX_REQ, req, - riak.pb.messages.MSG_CODE_INDEX_RESP) - - if return_terms and resp.results: - results = [(decode_index_value(index, pair.key), - bytes_to_str(pair.value)) - for pair in resp.results] - else: - results = resp.keys[:] - if PY3: - results = [bytes_to_str(key) for key in resp.keys] - - if max_results is not None and resp.HasField('continuation'): - return (results, bytes_to_str(resp.continuation)) - else: - return (results, None) - - def stream_index(self, bucket, index, startkey, endkey=None, - return_terms=None, max_results=None, continuation=None, - timeout=None, term_regex=None): - if not self.stream_indexes(): - raise NotImplementedError("Secondary index streaming is not " - "supported") - - if term_regex and not self.index_term_regex(): - raise NotImplementedError("Secondary index term_regex is not " - "supported") - - req = self._encode_index_req(bucket, index, startkey, endkey, - return_terms, max_results, continuation, - timeout, term_regex) - req.stream = True - - self._send_msg(riak.pb.messages.MSG_CODE_INDEX_REQ, req) - - return RiakPbcIndexStream(self, index, return_terms) - - def create_search_index(self, index, schema=None, n_val=None, - timeout=None): - if not self.pb_search_admin(): - raise NotImplementedError("Search 2.0 administration is not " - "supported for this version") - index = str_to_bytes(index) - idx = riak.pb.riak_yokozuna_pb2.RpbYokozunaIndex(name=index) - if schema: - idx.schema = str_to_bytes(schema) - if n_val: - idx.n_val = n_val - req = riak.pb.riak_yokozuna_pb2.RpbYokozunaIndexPutReq(index=idx) - if timeout is not None: - req.timeout = timeout - - self._request( - riak.pb.messages.MSG_CODE_YOKOZUNA_INDEX_PUT_REQ, req, - riak.pb.messages.MSG_CODE_PUT_RESP) - - return True - - def get_search_index(self, index): - if not self.pb_search_admin(): - raise NotImplementedError("Search 2.0 administration is not " - "supported for this version") - req = riak.pb.riak_yokozuna_pb2.RpbYokozunaIndexGetReq( - name=str_to_bytes(index)) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_YOKOZUNA_INDEX_GET_REQ, req, - riak.pb.messages.MSG_CODE_YOKOZUNA_INDEX_GET_RESP) - if len(resp.index) > 0: - return self._decode_search_index(resp.index[0]) - else: - raise RiakError('notfound') - - def list_search_indexes(self): - if not self.pb_search_admin(): - raise NotImplementedError("Search 2.0 administration is not " - "supported for this version") - req = riak.pb.riak_yokozuna_pb2.RpbYokozunaIndexGetReq() - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_YOKOZUNA_INDEX_GET_REQ, req, - riak.pb.messages.MSG_CODE_YOKOZUNA_INDEX_GET_RESP) - - return [self._decode_search_index(index) for index in resp.index] - - def delete_search_index(self, index): - if not self.pb_search_admin(): - raise NotImplementedError("Search 2.0 administration is not " - "supported for this version") - req = riak.pb.riak_yokozuna_pb2.RpbYokozunaIndexDeleteReq( - name=str_to_bytes(index)) - - self._request( - riak.pb.messages.MSG_CODE_YOKOZUNA_INDEX_DELETE_REQ, req, - riak.pb.messages.MSG_CODE_DEL_RESP) - - return True - - def create_search_schema(self, schema, content): - if not self.pb_search_admin(): - raise NotImplementedError("Search 2.0 administration is not " - "supported for this version") - scma = riak.pb.riak_yokozuna_pb2.RpbYokozunaSchema( - name=str_to_bytes(schema), - content=str_to_bytes(content)) - req = riak.pb.riak_yokozuna_pb2.RpbYokozunaSchemaPutReq( - schema=scma) - - self._request( - riak.pb.messages.MSG_CODE_YOKOZUNA_SCHEMA_PUT_REQ, req, - riak.pb.messages.MSG_CODE_PUT_RESP) - - return True - - def get_search_schema(self, schema): - if not self.pb_search_admin(): - raise NotImplementedError("Search 2.0 administration is not " - "supported for this version") - req = riak.pb.riak_yokozuna_pb2.RpbYokozunaSchemaGetReq( - name=str_to_bytes(schema)) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_YOKOZUNA_SCHEMA_GET_REQ, req, - riak.pb.messages.MSG_CODE_YOKOZUNA_SCHEMA_GET_RESP) - - result = {} - result['name'] = bytes_to_str(resp.schema.name) - result['content'] = bytes_to_str(resp.schema.content) - return result - - def search(self, index, query, **params): - if not self.pb_search(): - return self._search_mapred_emu(index, query) - - if PY2 and isinstance(query, unicode): # noqa - query = query.encode('utf8') - - req = riak.pb.riak_search_pb2.RpbSearchQueryReq( - index=str_to_bytes(index), - q=str_to_bytes(query)) - self._encode_search_query(req, params) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_SEARCH_QUERY_REQ, req, - riak.pb.messages.MSG_CODE_SEARCH_QUERY_RESP) - - result = {} - if resp.HasField('max_score'): - result['max_score'] = resp.max_score - if resp.HasField('num_found'): - result['num_found'] = resp.num_found - result['docs'] = [self._decode_search_doc(doc) for doc in resp.docs] - return result - - def get_counter(self, bucket, key, **params): - if not bucket.bucket_type.is_default(): - raise NotImplementedError("Counters are not " - "supported with bucket-types, " - "use datatypes instead.") - - if not self.counters(): - raise NotImplementedError("Counters are not supported") - - req = riak.pb.riak_kv_pb2.RpbCounterGetReq() - req.bucket = str_to_bytes(bucket.name) - req.key = str_to_bytes(key) - if params.get('r') is not None: - req.r = self._encode_quorum(params['r']) - if params.get('pr') is not None: - req.pr = self._encode_quorum(params['pr']) - if params.get('basic_quorum') is not None: - req.basic_quorum = params['basic_quorum'] - if params.get('notfound_ok') is not None: - req.notfound_ok = params['notfound_ok'] - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_COUNTER_GET_REQ, req, - riak.pb.messages.MSG_CODE_COUNTER_GET_RESP) - if resp.HasField('value'): - return resp.value - else: - return None - - def update_counter(self, bucket, key, value, **params): - if not bucket.bucket_type.is_default(): - raise NotImplementedError("Counters are not " - "supported with bucket-types, " - "use datatypes instead.") - - if not self.counters(): - raise NotImplementedError("Counters are not supported") - - req = riak.pb.riak_kv_pb2.RpbCounterUpdateReq() - req.bucket = str_to_bytes(bucket.name) - req.key = str_to_bytes(key) - req.amount = value - if params.get('w') is not None: - req.w = self._encode_quorum(params['w']) - if params.get('dw') is not None: - req.dw = self._encode_quorum(params['dw']) - if params.get('pw') is not None: - req.pw = self._encode_quorum(params['pw']) - if params.get('returnvalue') is not None: - req.returnvalue = params['returnvalue'] - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_COUNTER_UPDATE_REQ, req, - riak.pb.messages.MSG_CODE_COUNTER_UPDATE_RESP) - if resp.HasField('value'): - return resp.value - else: - return True - - def fetch_datatype(self, bucket, key, **options): - - if bucket.bucket_type.is_default(): - raise NotImplementedError("Datatypes cannot be used in the default" - " bucket-type.") - - if not self.datatypes(): - raise NotImplementedError("Datatypes are not supported.") - - req = riak.pb.riak_dt_pb2.DtFetchReq() - req.type = str_to_bytes(bucket.bucket_type.name) - req.bucket = str_to_bytes(bucket.name) - req.key = str_to_bytes(key) - self._encode_dt_options(req, options) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_DT_FETCH_REQ, req, - riak.pb.messages.MSG_CODE_DT_FETCH_RESP) - - return self._decode_dt_fetch(resp) - - def update_datatype(self, datatype, **options): - - if datatype.bucket.bucket_type.is_default(): - raise NotImplementedError("Datatypes cannot be used in the default" - " bucket-type.") - - if not self.datatypes(): - raise NotImplementedError("Datatypes are not supported.") - - op = datatype.to_op() - type_name = datatype.type_name - if not op: - raise ValueError("No operation to send on datatype {!r}". - format(datatype)) - - req = riak.pb.riak_dt_pb2.DtUpdateReq() - req.bucket = str_to_bytes(datatype.bucket.name) - req.type = str_to_bytes(datatype.bucket.bucket_type.name) - - if datatype.key: - req.key = str_to_bytes(datatype.key) - if datatype._context: - req.context = datatype._context - - self._encode_dt_options(req, options) - - self._encode_dt_op(type_name, req, op) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_DT_UPDATE_REQ, req, - riak.pb.messages.MSG_CODE_DT_UPDATE_RESP) - if resp.HasField('key'): - datatype.key = resp.key[:] - if resp.HasField('context'): - datatype._context = resp.context[:] - - if options.get('return_body'): - datatype._set_value(self._decode_dt_value(type_name, resp)) - - return True - - def get_preflist(self, bucket, key): - """ - Get the preflist for a bucket/key - - :param bucket: Riak Bucket - :type bucket: :class:`~riak.bucket.RiakBucket` - :param key: Riak Key - :type key: string - :rtype: list of dicts - """ - req = riak.pb.riak_kv_pb2.RpbGetBucketKeyPreflistReq() - req.bucket = str_to_bytes(bucket.name) - req.key = str_to_bytes(key) - req.type = str_to_bytes(bucket.bucket_type.name) - - msg_code, resp = self._request( - riak.pb.messages.MSG_CODE_GET_BUCKET_KEY_PREFLIST_REQ, req, - riak.pb.messages.MSG_CODE_GET_BUCKET_KEY_PREFLIST_RESP) - - return [self._decode_preflist(item) for item in resp.preflist] From 0c557aecbafb548ff2c1d40dfe4b07d0a334477f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 27 Apr 2016 10:00:42 -0700 Subject: [PATCH 3/8] Raise BadResource when a recv_into call returns zero bytes. This indicates that the connection has been closed, and the operation should be re-tried. --- riak/client/transport.py | 4 ++-- riak/transports/pool.py | 9 ++++++++- riak/transports/tcp/__init__.py | 2 +- riak/transports/tcp/connection.py | 9 ++++++++- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/riak/client/transport.py b/riak/client/transport.py index 6aca7f24..bb2aaef9 100644 --- a/riak/client/transport.py +++ b/riak/client/transport.py @@ -1,6 +1,6 @@ from contextlib import contextmanager from riak.transports.pool import BadResource -from riak.transports.tcp import is_retryable as is_pbc_retryable +from riak.transports.tcp import is_retryable as is_tcp_retryable from riak.transports.http import is_retryable as is_http_retryable import threading from six import PY2 @@ -162,7 +162,7 @@ def _is_retryable(error): :type error: Exception :rtype: boolean """ - return is_pbc_retryable(error) or is_http_retryable(error) + return is_tcp_retryable(error) or is_http_retryable(error) def retryable(fn, protocol=None): diff --git a/riak/transports/pool.py b/riak/transports/pool.py index d0a9ee7f..308e31d6 100644 --- a/riak/transports/pool.py +++ b/riak/transports/pool.py @@ -12,7 +12,14 @@ class BadResource(Exception): resource currently in-use is bad and should be removed from the pool. """ - pass + def __init__(self, value=None): + self.value = value + + def __str__(self): + if self.value is None: + return 'BadResource' + else: + return repr(self.value) class Resource(object): diff --git a/riak/transports/tcp/__init__.py b/riak/transports/tcp/__init__.py index 312f9194..2634af0a 100644 --- a/riak/transports/tcp/__init__.py +++ b/riak/transports/tcp/__init__.py @@ -42,7 +42,7 @@ def destroy_resource(self, tcp): def is_retryable(err): """ Determines if the given exception is something that is - network/socket-related and should thus cause the PBC connection to + network/socket-related and should thus cause the TCP connection to close and the operation retried on another node. :rtype: boolean diff --git a/riak/transports/tcp/connection.py b/riak/transports/tcp/connection.py index a8ca4e4d..4808e20b 100644 --- a/riak/transports/tcp/connection.py +++ b/riak/transports/tcp/connection.py @@ -7,6 +7,7 @@ from riak import RiakError from riak.codecs.pbuf import PbufCodec from riak.security import SecurityError, USE_STDLIB_SSL +from riak.transports.pool import BadResource if not USE_STDLIB_SSL: from OpenSSL.SSL import Connection @@ -174,6 +175,10 @@ def _recv(self, msglen): toread = msglen while toread: nbytes = self._socket.recv_into(view, toread) + # https://docs.python.org/2/howto/sockets.html#using-a-socket + # https://github.com/basho/riak-python-client/issues/399 + if nbytes == 0: + raise BadResource('recv_into returned zero bytes unexpectedly') view = view[nbytes:] # slicing views is cheap toread -= nbytes nread += nbytes @@ -190,7 +195,8 @@ def _connect(self): else: self._socket = socket.create_connection(self._address) if self._socket_keepalive: - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self._socket.setsockopt( + socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) ka_opts = self._socket_keepalive_options or {} for k, v in ka_opts.iteritems(): self._socket.setsockopt(socket.SOL_TCP, k, v) @@ -202,6 +208,7 @@ def close(self): Closes the underlying socket of the PB connection. """ if self._socket: + self._socket.shutdown(socket.SHUT_RDWR) self._socket.close() del self._socket From fe46c6d69f0d733e27a753196abdda7c95be2ee2 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 27 Apr 2016 11:35:16 -0700 Subject: [PATCH 4/8] Subclass Exception correctly --- riak/riak_error.py | 10 +++++++--- riak/transports/pool.py | 9 +-------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/riak/riak_error.py b/riak/riak_error.py index ce582bbb..b99eb7fe 100644 --- a/riak/riak_error.py +++ b/riak/riak_error.py @@ -21,8 +21,12 @@ class RiakError(Exception): """ Base class for exceptions generated in the Riak API. """ - def __init__(self, value): - self.value = value + def __init__(self, *args, **kwargs): + super(RiakError, self).__init__(*args, **kwargs) + if len(args) > 0: + self.value = args[0] + else: + self.value = 'unknown' def __str__(self): return repr(self.value) @@ -34,5 +38,5 @@ class ConflictError(RiakError): :class:`~riak.riak_object.RiakObject` that has more than one sibling. """ - def __init__(self, message="Object in conflict"): + def __init__(self, message='Object in conflict'): super(ConflictError, self).__init__(message) diff --git a/riak/transports/pool.py b/riak/transports/pool.py index 308e31d6..d0a9ee7f 100644 --- a/riak/transports/pool.py +++ b/riak/transports/pool.py @@ -12,14 +12,7 @@ class BadResource(Exception): resource currently in-use is bad and should be removed from the pool. """ - def __init__(self, value=None): - self.value = value - - def __str__(self): - if self.value is None: - return 'BadResource' - else: - return repr(self.value) + pass class Resource(object): From 866259f91d4a7112f9a4c41bb2375dd92132555a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 27 Apr 2016 15:45:17 -0700 Subject: [PATCH 5/8] Fixes in using socket.shutdown on older Python --- riak/transports/tcp/connection.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/riak/transports/tcp/connection.py b/riak/transports/tcp/connection.py index 4808e20b..2380a3d9 100644 --- a/riak/transports/tcp/connection.py +++ b/riak/transports/tcp/connection.py @@ -1,3 +1,4 @@ +import errno import socket import struct @@ -9,12 +10,12 @@ from riak.security import SecurityError, USE_STDLIB_SSL from riak.transports.pool import BadResource -if not USE_STDLIB_SSL: - from OpenSSL.SSL import Connection - from riak.transports.security import configure_pyopenssl_context -else: +if USE_STDLIB_SSL: import ssl from riak.transports.security import configure_ssl_context +else: + from OpenSSL.SSL import Connection + from riak.transports.security import configure_pyopenssl_context class TcpConnection(object): @@ -208,7 +209,16 @@ def close(self): Closes the underlying socket of the PB connection. """ if self._socket: - self._socket.shutdown(socket.SHUT_RDWR) + if USE_STDLIB_SSL: + # NB: Python 2.7.8 and earlier does not have a compatible + # shutdown() method due to the SSL lib + try: + self._socket.shutdown(socket.SHUT_RDWR) + except IOError as e: + # NB: sometimes this is the exception if the initial + # connection didn't succeed correctly + if e.errno != errno.EBADF: + raise self._socket.close() del self._socket From 30ef3e17003984b445a47158cba1455a0f58172c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 28 Apr 2016 08:19:03 -0700 Subject: [PATCH 6/8] prevent duplicate loading of test data --- riak/tests/test_btypes.py | 19 ++++++++++++------- riak/tests/test_kv.py | 25 ++++++++++++++++++------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/riak/tests/test_btypes.py b/riak/tests/test_btypes.py index d0fe728b..97d1b1a6 100644 --- a/riak/tests/test_btypes.py +++ b/riak/tests/test_btypes.py @@ -151,13 +151,18 @@ def test_multiget_bucket_types(self): self.assertEqual(btype, mobj.bucket.bucket_type) def test_write_once_bucket_type(self): - btype = self.client.bucket_type('write_once') - bucket = btype.bucket(self.bucket_name) - - for i in range(100): - obj = bucket.new(self.key_name + str(i)) - obj.data = {'id': i} - obj.store() + bt = 'write_once' + skey = 'write_once-init' + btype = self.client.bucket_type(bt) + bucket = btype.bucket(bt) + sobj = bucket.get(skey) + if not sobj.exists: + for i in range(100): + o = bucket.new(self.key_name + str(i)) + o.data = {'id': i} + o.store() + o = bucket.new(skey, data={'id': skey}) + o.store() mget = bucket.multiget([self.key_name + str(i) for i in range(100)]) for mobj in mget: diff --git a/riak/tests/test_kv.py b/riak/tests/test_kv.py index 5513c603..aeebed68 100644 --- a/riak/tests/test_kv.py +++ b/riak/tests/test_kv.py @@ -180,17 +180,29 @@ def test_string_bucket_name(self): def test_generate_key(self): # Ensure that Riak generates a random key when # the key passed to bucket.new() is None. - bucket = self.client.bucket('random_key_bucket') - existing_keys = bucket.get_keys() + bucket = self.client.bucket(self.bucket_name) o = bucket.new(None, data={}) self.assertIsNone(o.key) o.store() self.assertIsNotNone(o.key) self.assertNotIn('/', o.key) - self.assertNotIn(o.key, existing_keys) - self.assertEqual(len(bucket.get_keys()), len(existing_keys) + 1) + existing_keys = bucket.get_keys() + self.assertEqual(len(existing_keys), 1) + + def maybe_store_keys(self): + skey = 'rkb-init' + bucket = self.client.bucket('random_key_bucket') + sobj = bucket.get(skey) + if sobj.exists: + return + for key in range(1, 1000): + o = bucket.new(None, data={}) + o.store() + o = bucket.new(skey, data={}) + o.store() def test_stream_keys(self): + self.maybe_store_keys() bucket = self.client.bucket('random_key_bucket') regular_keys = bucket.get_keys() self.assertNotEqual(len(regular_keys), 0) @@ -203,10 +215,8 @@ def test_stream_keys(self): self.assertEqual(sorted(regular_keys), sorted(streamed_keys)) def test_stream_keys_timeout(self): + self.maybe_store_keys() bucket = self.client.bucket('random_key_bucket') - for key in range(1, 1000): - o = bucket.new(None, data={}) - o.store() streamed_keys = [] with self.assertRaises(RiakError): for keylist in self.client.stream_keys(bucket, timeout=1): @@ -216,6 +226,7 @@ def test_stream_keys_timeout(self): streamed_keys += keylist def test_stream_keys_abort(self): + self.maybe_store_keys() bucket = self.client.bucket('random_key_bucket') regular_keys = bucket.get_keys() self.assertNotEqual(len(regular_keys), 0) From 4bfc758431f31217ab9f8d1bfe77ab0f4508486e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 28 Apr 2016 10:39:06 -0700 Subject: [PATCH 7/8] Tweak code to set socket keepalives and options, and add test --- riak/tests/test_client.py | 12 ++++++++++++ riak/transports/tcp/connection.py | 7 ++++--- riak/transports/tcp/transport.py | 11 ++++++----- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/riak/tests/test_client.py b/riak/tests/test_client.py index 19379d06..18f3d558 100644 --- a/riak/tests/test_client.py +++ b/riak/tests/test_client.py @@ -3,6 +3,7 @@ from six import PY2 from threading import Thread from riak.riak_object import RiakObject +from riak.transports.tcp import TcpTransport from riak.tests import DUMMY_HTTP_PORT, DUMMY_PB_PORT, RUN_POOL from riak.tests.base import IntegrationTestBase @@ -13,6 +14,17 @@ class ClientTests(IntegrationTestBase, unittest.TestCase): + def test_can_set_tcp_keepalive(self): + if self.protocol == 'pbc': + topts = {'socket_keepalive': True} + c = self.create_client(transport_options=topts) + for i, r in enumerate(c._tcp_pool.resources): + self.assertIsInstance(r, TcpTransport) + self.assertTrue(r._socket_keepalive) + c.close() + else: + pass + def test_uses_client_id_if_given(self): if self.protocol == 'pbc': zero_client_id = "\0\0\0\0" diff --git a/riak/transports/tcp/connection.py b/riak/transports/tcp/connection.py index 2380a3d9..10adf191 100644 --- a/riak/transports/tcp/connection.py +++ b/riak/transports/tcp/connection.py @@ -195,12 +195,13 @@ def _connect(self): self._timeout) else: self._socket = socket.create_connection(self._address) + if self._socket_tcp_options: + ka_opts = self._socket_tcp_options + for k, v in ka_opts.iteritems(): + self._socket.setsockopt(socket.SOL_TCP, k, v) if self._socket_keepalive: self._socket.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - ka_opts = self._socket_keepalive_options or {} - for k, v in ka_opts.iteritems(): - self._socket.setsockopt(socket.SOL_TCP, k, v) if self._client._credentials: self._init_security() diff --git a/riak/transports/tcp/transport.py b/riak/transports/tcp/transport.py index 46fa1c8a..7f440d7c 100644 --- a/riak/transports/tcp/transport.py +++ b/riak/transports/tcp/transport.py @@ -25,8 +25,6 @@ def __init__(self, node=None, client=None, timeout=None, - socket_keepalive=False, - socket_keepalive_options=None, **kwargs): super(TcpTransport, self).__init__() @@ -35,11 +33,14 @@ def __init__(self, self._address = (node.host, node.pb_port) self._timeout = timeout self._socket = None - self._socket_keepalive = socket_keepalive - self._socket_keepalive_options = socket_keepalive_options self._pbuf_c = None self._ttb_c = None - self._use_ttb = kwargs.get('use_ttb', True) + self._socket_tcp_options = \ + kwargs.get('socket_tcp_options', {}) + self._socket_keepalive = \ + kwargs.get('socket_keepalive', False) + self._use_ttb = \ + kwargs.get('use_ttb', True) def _get_pbuf_codec(self): if not self._pbuf_c: From 23b19294fb2593739558d4bb2ad7d03616be7bd1 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 28 Apr 2016 10:41:23 -0700 Subject: [PATCH 8/8] Fix instance variable declaration --- riak/transports/tcp/connection.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/riak/transports/tcp/connection.py b/riak/transports/tcp/connection.py index 10adf191..aabcb52e 100644 --- a/riak/transports/tcp/connection.py +++ b/riak/transports/tcp/connection.py @@ -19,6 +19,12 @@ class TcpConnection(object): + # These are set in the TcpTransport initializer + _address = None + _timeout = None + _socket_keepalive = None + _socket_tcp_options = None + """ Connection-related methods for TcpTransport. """ @@ -222,9 +228,3 @@ def close(self): raise self._socket.close() del self._socket - - # These are set in the TcpTransport initializer - _address = None - _timeout = None - _socket_keepalive = None - _socket_keepalive_options = None