From cf26d1329498ac40edb831847b88fc01956fc905 Mon Sep 17 00:00:00 2001 From: David Delassus Date: Wed, 13 Jul 2016 16:38:57 +0200 Subject: [PATCH 1/6] Make sure all pools are stopped when shuting down --- riak/client/__init__.py | 7 +++++++ riak/client/multi.py | 19 ++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/riak/client/__init__.py b/riak/client/__init__.py index 491324a4..ec06fd0b 100644 --- a/riak/client/__init__.py +++ b/riak/client/__init__.py @@ -130,6 +130,13 @@ def __init__(self, protocol='pbc', transport_options={}, self._bucket_types = WeakValueDictionary() self._tables = WeakValueDictionary() + def __del__(self): + if self._multiget_pool: + self._multiget_pool.stop() + + if self._multiput_pool: + self._multiput_pool.stop() + def _get_protocol(self): return self._protocol diff --git a/riak/client/multi.py b/riak/client/multi.py index 84b19a3a..81e58add 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -7,6 +7,8 @@ from riak.riak_object import RiakObject from riak.ts_object import TsObject +import atexit + if PY2: from Queue import Queue else: @@ -102,9 +104,11 @@ def stop(self): """ Signals the worker threads to exit and waits on them. """ - self._stop.set() - for worker in self._workers: - worker.join() + + if not self.stopped(): + self._stop.set() + for worker in self._workers: + worker.join() def stopped(self): """ @@ -193,6 +197,15 @@ def _worker_method(self): RIAK_MULTIPUT_POOL = MultiPutPool() +def stop_pools(): + """Stop worker pools at exit.""" + + RIAK_MULTIGET_POOL.stop() + RIAK_MULTIPUT_POOL.stop() + + +atexit.register(stop_pools) + def multiget(client, keys, **options): """Executes a parallel-fetch across multiple threads. Returns a list containing :class:`~riak.riak_object.RiakObject` or From 3d8124401f2911d4cf7fae05a48e4c22cd28d795 Mon Sep 17 00:00:00 2001 From: David Delassus Date: Wed, 13 Jul 2016 16:47:40 +0200 Subject: [PATCH 2/6] Fix typo --- riak/client/multi.py | 1 + 1 file changed, 1 insertion(+) diff --git a/riak/client/multi.py b/riak/client/multi.py index 81e58add..a3fcc301 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -206,6 +206,7 @@ def stop_pools(): atexit.register(stop_pools) + def multiget(client, keys, **options): """Executes a parallel-fetch across multiple threads. Returns a list containing :class:`~riak.riak_object.RiakObject` or From 301ae54fa6d240213e7c4bf42f4095b2d6fe7e8a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 14 Jul 2016 06:04:37 -0700 Subject: [PATCH 3/6] Add contributor, release notes for 2.5.5 --- README.md | 1 + RELNOTES.md | 4 ++++ buildbot/Makefile | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c180bcfb..6ccb0aea 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,7 @@ Contributors * Daniel Reverri * [Dan Root](https://github.com/daroot) * [David Basden](https://github.com/dbasden) +* [David Delassus](https://github.com/linkdd) * David Koblas * Dmitry Rozhkov * Eric Florenzano diff --git a/RELNOTES.md b/RELNOTES.md index 6954bfa0..715c3fdf 100644 --- a/RELNOTES.md +++ b/RELNOTES.md @@ -1,5 +1,9 @@ # Riak Python Client Release Notes +## [2.5.5 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.5) + + * [Stop all pools when client shuts down](https://github.com/basho/riak-python-client/pull/488) + ## [2.5.4 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.4) * [When converting `datetime` objects to send to Riak TS, `tzinfo` will be used if present](https://github.com/basho/riak-python-client/pull/486) diff --git a/buildbot/Makefile b/buildbot/Makefile index cd561d48..18e42507 100644 --- a/buildbot/Makefile +++ b/buildbot/Makefile @@ -43,7 +43,7 @@ test: setup test_normal test_security test_normal: @echo "Testing Riak Python Client (without security)" @$(RIAK_ADMIN) security disable - @RIAK_TEST_PROTOCOL='pbc' RIAK_TEST_PB_PORT=8087 RUN_BTYPES=1 RUN_CLIENT=1 RUN_MAPREDUCE=1 RUN_KV=1 RUN_RESOLVE=1 RUN_YZ=1 RUN_DATATYPES=1 RUN_INDEXES=1 ./tox_runner.sh .. + @RIAK_TEST_PROTOCOL='pbc' RIAK_TEST_PB_PORT=8087 RUN_POOL=1 RUN_BTYPES=1 RUN_CLIENT=1 RUN_MAPREDUCE=1 RUN_KV=1 RUN_RESOLVE=1 RUN_YZ=1 RUN_DATATYPES=1 RUN_INDEXES=1 ./tox_runner.sh .. @RIAK_TEST_PROTOCOL='http' RIAK_TEST_HTTP_PORT=8098 RUN_BTYPES=1 RUN_CLIENT=1 RUN_MAPREDUCE=1 RUN_KV=1 RUN_RESOLVE=1 RUN_YZ=1 RUN_DATATYPES=1 RUN_INDEXES=1 ./tox_runner.sh .. test_security: From 9fca06c741a249fceb26259862a8d13b63e9906e Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 14 Jul 2016 06:16:11 -0700 Subject: [PATCH 4/6] Add debugging --- riak/client/__init__.py | 1 - riak/client/multi.py | 12 +++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/riak/client/__init__.py b/riak/client/__init__.py index ec06fd0b..78704f05 100644 --- a/riak/client/__init__.py +++ b/riak/client/__init__.py @@ -133,7 +133,6 @@ def __init__(self, protocol='pbc', transport_options={}, def __del__(self): if self._multiget_pool: self._multiget_pool.stop() - if self._multiput_pool: self._multiput_pool.stop() diff --git a/riak/client/multi.py b/riak/client/multi.py index a3fcc301..145adc8d 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -8,6 +8,7 @@ from riak.ts_object import TsObject import atexit +import sys if PY2: from Queue import Queue @@ -104,11 +105,15 @@ def stop(self): """ Signals the worker threads to exit and waits on them. """ - - if not self.stopped(): + if self.stopped(): + sys.stderr.write('pool already stopped\n') + else: + sys.stderr.write('stopping pool\n') self._stop.set() for worker in self._workers: + sys.stderr.write('stopping worker {0}\n'.format(worker.name)) worker.join() + sys.stderr.write('all workers joined\n') def stopped(self): """ @@ -148,7 +153,9 @@ def _worker_method(self): output queue. """ while not self._should_quit(): + sys.stderr.write('worker {0} waiting for task...\n'.format(self._name)) task = self._inq.get() + sys.stderr.write('worker {0} got task\n'.format(self._name)) try: btype = task.client.bucket_type(task.bucket_type) obj = btype.bucket(task.bucket).get(task.key, **task.options) @@ -199,7 +206,6 @@ def _worker_method(self): def stop_pools(): """Stop worker pools at exit.""" - RIAK_MULTIGET_POOL.stop() RIAK_MULTIPUT_POOL.stop() From 45720cd075b9f2a6164d93b635125147c0ce6ad3 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 14 Jul 2016 06:24:38 -0700 Subject: [PATCH 5/6] Add Queue timeouts, remove debugging --- riak/client/multi.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/riak/client/multi.py b/riak/client/multi.py index 145adc8d..40238ad2 100644 --- a/riak/client/multi.py +++ b/riak/client/multi.py @@ -8,12 +8,11 @@ from riak.ts_object import TsObject import atexit -import sys if PY2: - from Queue import Queue + from Queue import Queue, Empty else: - from queue import Queue + from queue import Queue, Empty __all__ = ['multiget', 'multiput', 'MultiGetPool', 'MultiPutPool'] @@ -105,15 +104,10 @@ def stop(self): """ Signals the worker threads to exit and waits on them. """ - if self.stopped(): - sys.stderr.write('pool already stopped\n') - else: - sys.stderr.write('stopping pool\n') + if not self.stopped(): self._stop.set() for worker in self._workers: - sys.stderr.write('stopping worker {0}\n'.format(worker.name)) worker.join() - sys.stderr.write('all workers joined\n') def stopped(self): """ @@ -153,9 +147,11 @@ def _worker_method(self): output queue. """ while not self._should_quit(): - sys.stderr.write('worker {0} waiting for task...\n'.format(self._name)) - task = self._inq.get() - sys.stderr.write('worker {0} got task\n'.format(self._name)) + try: + task = self._inq.get(block=True, timeout=0.25) + except Empty: + continue + try: btype = task.client.bucket_type(task.bucket_type) obj = btype.bucket(task.bucket).get(task.key, **task.options) @@ -181,7 +177,11 @@ def _worker_method(self): the output queue. """ while not self._should_quit(): - task = self._inq.get() + try: + task = self._inq.get(block=True, timeout=0.25) + except Empty: + continue + try: obj = task.object if isinstance(obj, RiakObject): From 3aec1c34d247716daee8e92432d09c2ee5d02b8d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 14 Jul 2016 07:26:10 -0700 Subject: [PATCH 6/6] remove pool tests from buildbot --- buildbot/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildbot/Makefile b/buildbot/Makefile index 18e42507..cd561d48 100644 --- a/buildbot/Makefile +++ b/buildbot/Makefile @@ -43,7 +43,7 @@ test: setup test_normal test_security test_normal: @echo "Testing Riak Python Client (without security)" @$(RIAK_ADMIN) security disable - @RIAK_TEST_PROTOCOL='pbc' RIAK_TEST_PB_PORT=8087 RUN_POOL=1 RUN_BTYPES=1 RUN_CLIENT=1 RUN_MAPREDUCE=1 RUN_KV=1 RUN_RESOLVE=1 RUN_YZ=1 RUN_DATATYPES=1 RUN_INDEXES=1 ./tox_runner.sh .. + @RIAK_TEST_PROTOCOL='pbc' RIAK_TEST_PB_PORT=8087 RUN_BTYPES=1 RUN_CLIENT=1 RUN_MAPREDUCE=1 RUN_KV=1 RUN_RESOLVE=1 RUN_YZ=1 RUN_DATATYPES=1 RUN_INDEXES=1 ./tox_runner.sh .. @RIAK_TEST_PROTOCOL='http' RIAK_TEST_HTTP_PORT=8098 RUN_BTYPES=1 RUN_CLIENT=1 RUN_MAPREDUCE=1 RUN_KV=1 RUN_RESOLVE=1 RUN_YZ=1 RUN_DATATYPES=1 RUN_INDEXES=1 ./tox_runner.sh .. test_security: