Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELNOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [2.5.5 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.5)

* [Stop all pools when client shuts down](https://github.com/basho/riak-python-client/pull/488)
* [Calling `close` on client closes pools, remove global multi pools](https://github.com/basho/riak-python-client/pull/490). *NOTE*: if you use the multi get or put features of the client, you *MUST* call `close()` on your `RiakClient` instance to correctly clean up the thread pools used for these multi-operations.

## [2.5.4 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.4)

Expand Down
23 changes: 15 additions & 8 deletions riak/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(self, protocol='pbc', transport_options={},
self._credentials = self._create_credentials(credentials)
self._http_pool = HttpPool(self, **transport_options)
self._tcp_pool = TcpPool(self, **transport_options)
self._closed = False

if PY2:
self._encoders = {'application/json': default_encoder,
Expand All @@ -131,10 +132,7 @@ def __init__(self, protocol='pbc', transport_options={},
self._tables = WeakValueDictionary()

def __del__(self):
if self._multiget_pool:
self._multiget_pool.stop()
if self._multiput_pool:
self._multiput_pool.stop()
self.close()

def _get_protocol(self):
return self._protocol
Expand Down Expand Up @@ -310,10 +308,19 @@ def close(self):
"""
Iterate through all of the connections and close each one.
"""
if self._http_pool is not None:
self._http_pool.clear()
if self._tcp_pool is not None:
self._tcp_pool.clear()
if not self._closed:
self._closed = True
self._stop_multi_pools()
if self._http_pool is not None:
self._http_pool.clear()
if self._tcp_pool is not None:
self._tcp_pool.clear()

def _stop_multi_pools(self):
if self._multiget_pool:
self._multiget_pool.stop()
if self._multiput_pool:
self._multiput_pool.stop()

def _create_node(self, n):
if isinstance(n, RiakNode):
Expand Down
109 changes: 59 additions & 50 deletions riak/client/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from riak.riak_object import RiakObject
from riak.ts_object import TsObject

import atexit

if PY2:
from Queue import Queue, Empty
else:
Expand Down Expand Up @@ -89,7 +87,7 @@ def start(self):
name = "riak.client.multi-worker-{0}-{1}".format(
self._name, i)
worker = Thread(target=self._worker_method, name=name)
worker.daemon = True
worker.daemon = False
worker.start()
self._workers.append(worker)
self._started.set()
Expand Down Expand Up @@ -149,6 +147,11 @@ def _worker_method(self):
while not self._should_quit():
try:
task = self._inq.get(block=True, timeout=0.25)
except TypeError:
if self._should_quit():
break
else:
raise
except Empty:
continue

Expand Down Expand Up @@ -179,6 +182,11 @@ def _worker_method(self):
while not self._should_quit():
try:
task = self._inq.get(block=True, timeout=0.25)
except TypeError:
if self._should_quit():
break
else:
raise
except Empty:
continue

Expand All @@ -200,29 +208,16 @@ def _worker_method(self):
self._inq.task_done()


RIAK_MULTIGET_POOL = MultiGetPool()
RIAK_MULTIPUT_POOL = MultiPutPool()


def stop_pools():
"""Stop worker pools at exit."""
RIAK_MULTIGET_POOL.stop()
RIAK_MULTIPUT_POOL.stop()


atexit.register(stop_pools)


def multiget(client, keys, **options):
"""Executes a parallel-fetch across multiple threads. Returns a list
containing :class:`~riak.riak_object.RiakObject` or
:class:`~riak.datatypes.Datatype` instances, or 4-tuples of
bucket-type, bucket, key, and the exception raised.

If a ``pool`` option is included, the request will use the given worker
pool and not the default :data:`RIAK_MULTIGET_POOL`. This option will
be passed by the client if the ``multiget_pool_size`` option was set on
client initialization.
pool and not a transient :class:`~riak.client.multi.MultiGetPool`. This
option will be passed by the client if the ``multiget_pool_size``
option was set on client initialization.

:param client: the client to use
:type client: :class:`~riak.client.RiakClient`
Expand All @@ -234,26 +229,33 @@ def multiget(client, keys, **options):
:rtype: list

"""
transient_pool = False
outq = Queue()

if 'pool' in options:
pool = options['pool']
del options['pool']
else:
pool = RIAK_MULTIGET_POOL

pool.start()
for bucket_type, bucket, key in keys:
task = Task(client, outq, bucket_type, bucket, key, None, options)
pool.enq(task)

results = []
for _ in range(len(keys)):
if pool.stopped():
raise RuntimeError("Multi-get operation interrupted by pool "
"stopping!")
results.append(outq.get())
outq.task_done()
pool = MultiGetPool()
transient_pool = True

try:
pool.start()
for bucket_type, bucket, key in keys:
task = Task(client, outq, bucket_type, bucket, key, None, options)
pool.enq(task)

results = []
for _ in range(len(keys)):
if pool.stopped():
raise RuntimeError(
'Multi-get operation interrupted by pool '
'stopping!')
results.append(outq.get())
outq.task_done()
finally:
if transient_pool:
pool.stop()

return results

Expand All @@ -263,9 +265,9 @@ def multiput(client, objs, **options):
containing booleans or :class:`~riak.riak_object.RiakObject`

If a ``pool`` option is included, the request will use the given worker
pool and not the default :data:`RIAK_MULTIPUT_POOL`. This option will
be passed by the client if the ``multiput_pool_size`` option was set on
client initialization.
pool and not a transient :class:`~riak.client.multi.MultiPutPool`. This
option will be passed by the client if the ``multiput_pool_size``
option was set on client initialization.

:param client: the client to use
:type client: :class:`RiakClient <riak.client.RiakClient>`
Expand All @@ -277,25 +279,32 @@ def multiput(client, objs, **options):
:type options: dict
:rtype: list
"""
transient_pool = False
outq = Queue()

if 'pool' in options:
pool = options['pool']
del options['pool']
else:
pool = RIAK_MULTIPUT_POOL

pool.start()
for obj in objs:
task = PutTask(client, outq, obj, options)
pool.enq(task)

results = []
for _ in range(len(objs)):
if pool.stopped():
raise RuntimeError("Multi-put operation interrupted by pool "
"stopping!")
results.append(outq.get())
outq.task_done()
pool = MultiPutPool()
transient_pool = True

try:
pool.start()
for obj in objs:
task = PutTask(client, outq, obj, options)
pool.enq(task)

results = []
for _ in range(len(objs)):
if pool.stopped():
raise RuntimeError(
'Multi-put operation interrupted by pool '
'stopping!')
results.append(outq.get())
outq.task_done()
finally:
if transient_pool:
pool.stop()

return results