From cf26d1329498ac40edb831847b88fc01956fc905 Mon Sep 17 00:00:00 2001 From: David Delassus Date: Wed, 13 Jul 2016 16:38:57 +0200 Subject: [PATCH 1/2] Make sure all pools are stopped when shuting down --- riak/client/__init__.py | 7 +++++++ riak/client/multi.py | 19 ++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/riak/client/__init__.py b/riak/client/__init__.py index 491324a4..ec06fd0b 100644 --- a/riak/client/__init__.py +++ b/riak/client/__init__.py @@ -130,6 +130,13 @@ def __init__(self, protocol='pbc', transport_options={}, self._bucket_types = WeakValueDictionary() self._tables = WeakValueDictionary() + def __del__(self): + if self._multiget_pool: + self._multiget_pool.stop() + + if self._multiput_pool: + self._multiput_pool.stop() + def _get_protocol(self): return self._protocol diff --git a/riak/client/multi.py b/riak/client/multi.py index 84b19a3a..81e58add 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -7,6 +7,8 @@ from riak.riak_object import RiakObject from riak.ts_object import TsObject +import atexit + if PY2: from Queue import Queue else: @@ -102,9 +104,11 @@ def stop(self): """ Signals the worker threads to exit and waits on them. """ - self._stop.set() - for worker in self._workers: - worker.join() + + if not self.stopped(): + self._stop.set() + for worker in self._workers: + worker.join() def stopped(self): """ @@ -193,6 +197,15 @@ def _worker_method(self): RIAK_MULTIPUT_POOL = MultiPutPool() +def stop_pools(): + """Stop worker pools at exit.""" + + RIAK_MULTIGET_POOL.stop() + RIAK_MULTIPUT_POOL.stop() + + +atexit.register(stop_pools) + def multiget(client, keys, **options): """Executes a parallel-fetch across multiple threads. Returns a list containing :class:`~riak.riak_object.RiakObject` or From 3d8124401f2911d4cf7fae05a48e4c22cd28d795 Mon Sep 17 00:00:00 2001 From: David Delassus Date: Wed, 13 Jul 2016 16:47:40 +0200 Subject: [PATCH 2/2] Fix typo --- riak/client/multi.py | 1 + 1 file changed, 1 insertion(+) diff --git a/riak/client/multi.py b/riak/client/multi.py index 81e58add..a3fcc301 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -206,6 +206,7 @@ def stop_pools(): atexit.register(stop_pools) + def multiget(client, keys, **options): """Executes a parallel-fetch across multiple threads. Returns a list containing :class:`~riak.riak_object.RiakObject` or