diff --git a/README.md b/README.md index c180bcfb..6ccb0aea 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,7 @@ Contributors * Daniel Reverri * [Dan Root](https://github.com/daroot) * [David Basden](https://github.com/dbasden) +* [David Delassus](https://github.com/linkdd) * David Koblas * Dmitry Rozhkov * Eric Florenzano diff --git a/RELNOTES.md b/RELNOTES.md index 6954bfa0..715c3fdf 100644 --- a/RELNOTES.md +++ b/RELNOTES.md @@ -1,5 +1,9 @@ # Riak Python Client Release Notes +## [2.5.5 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.5) + + * [Stop all pools when client shuts down](https://github.com/basho/riak-python-client/pull/488) + ## [2.5.4 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.4) * [When converting `datetime` objects to send to Riak TS, `tzinfo` will be used if present](https://github.com/basho/riak-python-client/pull/486) diff --git a/riak/client/__init__.py b/riak/client/__init__.py index 491324a4..78704f05 100644 --- a/riak/client/__init__.py +++ b/riak/client/__init__.py @@ -130,6 +130,12 @@ def __init__(self, protocol='pbc', transport_options={}, self._bucket_types = WeakValueDictionary() self._tables = WeakValueDictionary() + def __del__(self): + if self._multiget_pool: + self._multiget_pool.stop() + if self._multiput_pool: + self._multiput_pool.stop() + def _get_protocol(self): return self._protocol diff --git a/riak/client/multi.py b/riak/client/multi.py index 84b19a3a..40238ad2 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -7,10 +7,12 @@ from riak.riak_object import RiakObject from riak.ts_object import TsObject +import atexit + if PY2: - from Queue import Queue + from Queue import Queue, Empty else: - from queue import Queue + from queue import Queue, Empty __all__ = ['multiget', 'multiput', 'MultiGetPool', 'MultiPutPool'] @@ -102,9 +104,10 @@ def stop(self): """ Signals the worker threads to exit and waits on them. """ - self._stop.set() - for worker in self._workers: - worker.join() + if not self.stopped(): + self._stop.set() + for worker in self._workers: + worker.join() def stopped(self): """ @@ -144,7 +147,11 @@ def _worker_method(self): output queue. """ while not self._should_quit(): - task = self._inq.get() + try: + task = self._inq.get(block=True, timeout=0.25) + except Empty: + continue + try: btype = task.client.bucket_type(task.bucket_type) obj = btype.bucket(task.bucket).get(task.key, **task.options) @@ -170,7 +177,11 @@ def _worker_method(self): the output queue. """ while not self._should_quit(): - task = self._inq.get() + try: + task = self._inq.get(block=True, timeout=0.25) + except Empty: + continue + try: obj = task.object if isinstance(obj, RiakObject): @@ -193,6 +204,15 @@ def _worker_method(self): RIAK_MULTIPUT_POOL = MultiPutPool() +def stop_pools(): + """Stop worker pools at exit.""" + RIAK_MULTIGET_POOL.stop() + RIAK_MULTIPUT_POOL.stop() + + +atexit.register(stop_pools) + + def multiget(client, keys, **options): """Executes a parallel-fetch across multiple threads. Returns a list containing :class:`~riak.riak_object.RiakObject` or