From 32d19da6866f734233bb0bdf76820519914c3cbf Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 14 Jul 2016 10:07:22 -0700 Subject: [PATCH 1/2] Remove global multi pools. Close lazy-created multi pools when client object is closed --- riak/client/__init__.py | 23 ++++++--- riak/client/multi.py | 109 ++++++++++++++++++++++------------------ 2 files changed, 74 insertions(+), 58 deletions(-) diff --git a/riak/client/__init__.py b/riak/client/__init__.py index 78704f05..dc97ad52 100644 --- a/riak/client/__init__.py +++ b/riak/client/__init__.py @@ -109,6 +109,7 @@ def __init__(self, protocol='pbc', transport_options={}, self._credentials = self._create_credentials(credentials) self._http_pool = HttpPool(self, **transport_options) self._tcp_pool = TcpPool(self, **transport_options) + self._closed = False if PY2: self._encoders = {'application/json': default_encoder, @@ -131,10 +132,7 @@ def __init__(self, protocol='pbc', transport_options={}, self._tables = WeakValueDictionary() def __del__(self): - if self._multiget_pool: - self._multiget_pool.stop() - if self._multiput_pool: - self._multiput_pool.stop() + self.close() def _get_protocol(self): return self._protocol @@ -310,10 +308,19 @@ def close(self): """ Iterate through all of the connections and close each one. """ - if self._http_pool is not None: - self._http_pool.clear() - if self._tcp_pool is not None: - self._tcp_pool.clear() + if not self._closed: + self._closed = True + self._stop_multi_pools() + if self._http_pool is not None: + self._http_pool.clear() + if self._tcp_pool is not None: + self._tcp_pool.clear() + + def _stop_multi_pools(self): + if self._multiget_pool: + self._multiget_pool.stop() + if self._multiput_pool: + self._multiput_pool.stop() def _create_node(self, n): if isinstance(n, RiakNode): diff --git a/riak/client/multi.py b/riak/client/multi.py index 40238ad2..b7a1e11a 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -7,8 +7,6 @@ from riak.riak_object import RiakObject from riak.ts_object import TsObject -import atexit - if PY2: from Queue import Queue, Empty else: @@ -89,7 +87,7 @@ def start(self): name = "riak.client.multi-worker-{0}-{1}".format( self._name, i) worker = Thread(target=self._worker_method, name=name) - worker.daemon = True + worker.daemon = False worker.start() self._workers.append(worker) self._started.set() @@ -149,6 +147,11 @@ def _worker_method(self): while not self._should_quit(): try: task = self._inq.get(block=True, timeout=0.25) + except TypeError: + if self._should_quit(): + break + else: + raise except Empty: continue @@ -179,6 +182,11 @@ def _worker_method(self): while not self._should_quit(): try: task = self._inq.get(block=True, timeout=0.25) + except TypeError: + if self._should_quit(): + break + else: + raise except Empty: continue @@ -200,19 +208,6 @@ def _worker_method(self): self._inq.task_done() -RIAK_MULTIGET_POOL = MultiGetPool() -RIAK_MULTIPUT_POOL = MultiPutPool() - - -def stop_pools(): - """Stop worker pools at exit.""" - RIAK_MULTIGET_POOL.stop() - RIAK_MULTIPUT_POOL.stop() - - -atexit.register(stop_pools) - - def multiget(client, keys, **options): """Executes a parallel-fetch across multiple threads. Returns a list containing :class:`~riak.riak_object.RiakObject` or @@ -220,9 +215,9 @@ def multiget(client, keys, **options): bucket-type, bucket, key, and the exception raised. If a ``pool`` option is included, the request will use the given worker - pool and not the default :data:`RIAK_MULTIGET_POOL`. This option will - be passed by the client if the ``multiget_pool_size`` option was set on - client initialization. + pool and not a transient :class:`~riak.client.multi.MultiGetPool`. This + option will be passed by the client if the ``multiget_pool_size`` + option was set on client initialization. :param client: the client to use :type client: :class:`~riak.client.RiakClient` @@ -234,26 +229,33 @@ def multiget(client, keys, **options): :rtype: list """ + transient_pool = False outq = Queue() if 'pool' in options: pool = options['pool'] del options['pool'] else: - pool = RIAK_MULTIGET_POOL - - pool.start() - for bucket_type, bucket, key in keys: - task = Task(client, outq, bucket_type, bucket, key, None, options) - pool.enq(task) - - results = [] - for _ in range(len(keys)): - if pool.stopped(): - raise RuntimeError("Multi-get operation interrupted by pool " - "stopping!") - results.append(outq.get()) - outq.task_done() + pool = MultiGetPool() + transient_pool = True + + try: + pool.start() + for bucket_type, bucket, key in keys: + task = Task(client, outq, bucket_type, bucket, key, None, options) + pool.enq(task) + + results = [] + for _ in range(len(keys)): + if pool.stopped(): + raise RuntimeError( + 'Multi-get operation interrupted by pool ' + 'stopping!') + results.append(outq.get()) + outq.task_done() + finally: + if transient_pool: + pool.stop() return results @@ -263,9 +265,9 @@ def multiput(client, objs, **options): containing booleans or :class:`~riak.riak_object.RiakObject` If a ``pool`` option is included, the request will use the given worker - pool and not the default :data:`RIAK_MULTIPUT_POOL`. This option will - be passed by the client if the ``multiput_pool_size`` option was set on - client initialization. + pool and not a transient :class:`~riak.client.multi.MultiPutPool`. This + option will be passed by the client if the ``multiput_pool_size`` + option was set on client initialization. :param client: the client to use :type client: :class:`RiakClient ` @@ -277,25 +279,32 @@ def multiput(client, objs, **options): :type options: dict :rtype: list """ + transient_pool = False outq = Queue() if 'pool' in options: pool = options['pool'] del options['pool'] else: - pool = RIAK_MULTIPUT_POOL - - pool.start() - for obj in objs: - task = PutTask(client, outq, obj, options) - pool.enq(task) - - results = [] - for _ in range(len(objs)): - if pool.stopped(): - raise RuntimeError("Multi-put operation interrupted by pool " - "stopping!") - results.append(outq.get()) - outq.task_done() + pool = MultiPutPool() + transient_pool = True + + try: + pool.start() + for obj in objs: + task = PutTask(client, outq, obj, options) + pool.enq(task) + + results = [] + for _ in range(len(objs)): + if pool.stopped(): + raise RuntimeError( + 'Multi-put operation interrupted by pool ' + 'stopping!') + results.append(outq.get()) + outq.task_done() + finally: + if transient_pool: + pool.stop() return results From 6f7d4206d3a70ec7f290aa1f9e04958a0e1e629d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 14 Jul 2016 13:00:24 -0700 Subject: [PATCH 2/2] Add loud note for 2.5.5 release with regard to multi operations --- RELNOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELNOTES.md b/RELNOTES.md index 715c3fdf..796142d0 100644 --- a/RELNOTES.md +++ b/RELNOTES.md @@ -3,6 +3,7 @@ ## [2.5.5 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.5) * [Stop all pools when client shuts down](https://github.com/basho/riak-python-client/pull/488) + * [Calling `close` on client closes pools, remove global multi pools](https://github.com/basho/riak-python-client/pull/490). *NOTE*: if you use the multi get or put features of the client, you *MUST* call `close()` on your `RiakClient` instance to correctly clean up the thread pools used for these multi-operations. ## [2.5.4 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.4)