Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions riak/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from riak.security import SecurityCreds
from riak.util import lazy_property, bytes_to_str, str_to_bytes
from six import string_types, PY2
from riak.client.multiget import MultiGetPool
from riak.client.multi import MultiGetPool, MultiPutPool


def default_encoder(obj):
Expand Down Expand Up @@ -67,8 +67,10 @@ class RiakClient(RiakMapReduceChain, RiakClientOperations):
#: The supported protocols
PROTOCOLS = ['http', 'pbc']

def __init__(self, protocol='pbc', transport_options={}, nodes=None,
credentials=None, multiget_pool_size=None, **kwargs):
def __init__(self, protocol='pbc', transport_options={},
nodes=None, credentials=None,
multiget_pool_size=None, multiput_pool_size=None,
**kwargs):
"""
Construct a new ``RiakClient`` object.

Expand All @@ -87,6 +89,10 @@ def __init__(self, protocol='pbc', transport_options={}, nodes=None,
:meth:`multiget` operations. Defaults to a factor of the number of
CPUs in the system
:type multiget_pool_size: int
:param multiput_pool_size: the number of threads to use in
:meth:`multiput` operations. Defaults to a factor of the number of
CPUs in the system
:type multiput_pool_size: int
"""
kwargs = kwargs.copy()

Expand All @@ -96,6 +102,7 @@ def __init__(self, protocol='pbc', transport_options={}, nodes=None,
self.nodes = [self._create_node(n) for n in nodes]

self._multiget_pool_size = multiget_pool_size
self._multiput_pool_size = multiput_pool_size
self.protocol = protocol or 'pbc'
self._resolver = None
self._credentials = self._create_credentials(credentials)
Expand Down Expand Up @@ -358,6 +365,13 @@ def _multiget_pool(self):
else:
return None

@lazy_property
def _multiput_pool(self):
if self._multiput_pool_size:
return MultiPutPool(self._multiput_pool_size)
else:
return None

def __hash__(self):
return hash(frozenset([(n.host, n.http_port, n.pb_port)
for n in self.nodes]))
Expand Down
138 changes: 111 additions & 27 deletions riak/client/multiget.py → riak/client/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
from threading import Thread, Lock, Event
from multiprocessing import cpu_count
from six import PY2

if PY2:
from Queue import Queue
else:
from queue import Queue

__all__ = ['multiget', 'MultiGetPool']
__all__ = ['multiget', 'multiput', 'MultiGetPool', 'MultiPutPool']


try:
Expand All @@ -21,25 +20,29 @@
POOL_SIZE = 6

#: A :class:`namedtuple` for tasks that are fed to workers in the
#: multiget pool.
Task = namedtuple('Task', ['client', 'outq', 'bucket_type', 'bucket', 'key',
'options'])
#: multi pool.
Task = namedtuple(
'Task',
['client', 'outq',
'bucket_type', 'bucket', 'key',
'object', 'options'])


class MultiGetPool(object):
class MultiPool(object):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the code reuse here

"""
Encapsulates a pool of fetcher threads. These threads can be used
across many multi-get requests.
Encapsulates a pool of threads. These threads can be used
across many multi requests.
"""

def __init__(self, size=POOL_SIZE):
def __init__(self, size=POOL_SIZE, name='unknown'):
"""
:param size: the desired size of the worker pool
:type size: int
"""

self._inq = Queue()
self._size = size
self._name = name
self._started = Event()
self._stop = Event()
self._lock = Lock()
Expand All @@ -57,14 +60,14 @@ def enq(self, task):
if not self._stop.is_set():
self._inq.put(task)
else:
raise RuntimeError("Attempted to enqueue a fetch operation while "
"multi-get pool was shutdown!")
raise RuntimeError("Attempted to enqueue an operation while "
"multi pool was shutdown!")

def start(self):
"""
Starts the worker threads if they are not already started.
This method is thread-safe and will be called automatically
when executing a MultiGet operation.
when executing an operation.
"""
# Check whether we are already started, skip if we are.
if not self._started.is_set():
Expand All @@ -73,8 +76,9 @@ def start(self):
# If we got the lock, go ahead and start the worker
# threads, set the started flag, and release the lock.
for i in range(self._size):
name = "riak.client.multiget-worker-{0}".format(i)
worker = Thread(target=self._fetcher, name=name)
name = "riak.client.multi-worker-{0}-{1}".format(
self._name, i)
worker = Thread(target=self._worker_method, name=name)
worker.daemon = True
worker.start()
self._workers.append(worker)
Expand Down Expand Up @@ -105,7 +109,26 @@ def __del__(self):
# shutting down.
self.stop()

def _fetcher(self):
def _worker_method(self):
raise NotImplementedError

def _should_quit(self):
"""
Worker threads should exit when the stop flag is set and the
input queue is empty. Once the stop flag is set, new enqueues
are disallowed, meaning that the workers can safely drain the
queue before exiting.

:rtype: bool
"""
return self.stopped() and self._inq.empty()


class MultiGetPool(MultiPool):
def __init__(self, size=POOL_SIZE):
super(MultiGetPool, self).__init__(size=size, name='get')

def _worker_method(self):
"""
The body of the multi-get worker. Loops until
:meth:`_should_quit` returns ``True``, taking tasks off the
Expand All @@ -121,24 +144,40 @@ def _fetcher(self):
except KeyboardInterrupt:
raise
except Exception as err:
task.outq.put((task.bucket_type, task.bucket, task.key, err), )
errdata = (task.bucket_type, task.bucket, task.key, err)
task.outq.put(errdata)
finally:
self._inq.task_done()

def _should_quit(self):
"""
Worker threads should exit when the stop flag is set and the
input queue is empty. Once the stop flag is set, new enqueues
are disallowed, meaning that the workers can safely drain the
queue before exiting.

:rtype: bool
class MultiPutPool(MultiPool):
def __init__(self, size=POOL_SIZE):
super(MultiPutPool, self).__init__(size=size, name='put')

def _worker_method(self):
"""
return self.stopped() and self._inq.empty()
The body of the multi-put worker. Loops until
:meth:`_should_quit` returns ``True``, taking tasks off the
input queue, storing the object, and putting the result on
the output queue.
"""
while not self._should_quit():
task = self._inq.get()
try:
robj = task.object
rv = task.client.put(robj, **task.options)
task.outq.put(rv)
except KeyboardInterrupt:
raise
except Exception as err:
errdata = (task.object, err)
task.outq.put(errdata)
finally:
self._inq.task_done()


#: The default pool is automatically created and stored in this constant.
RIAK_MULTIGET_POOL = MultiGetPool()
RIAK_MULTIPUT_POOL = MultiPutPool()


def multiget(client, keys, **options):
Expand All @@ -160,8 +199,8 @@ def multiget(client, keys, **options):
:meth:`RiakBucket.get <riak.bucket.RiakBucket.get>`
:type options: dict
:rtype: list
"""

"""
outq = Queue()

if 'pool' in options:
Expand All @@ -172,7 +211,7 @@ def multiget(client, keys, **options):

pool.start()
for bucket_type, bucket, key in keys:
task = Task(client, outq, bucket_type, bucket, key, options)
task = Task(client, outq, bucket_type, bucket, key, None, options)
pool.enq(task)

results = []
Expand All @@ -184,3 +223,48 @@ def multiget(client, keys, **options):
outq.task_done()

return results


def multiput(client, objs, **options):
"""Executes a parallel-store across multiple threads. Returns a list
containing booleans or :class:`~riak.riak_object.RiakObject`

If a ``pool`` option is included, the request will use the given worker
pool and not the default :data:`RIAK_MULTIPUT_POOL`. This option will
be passed by the client if the ``multiput_pool_size`` option was set on
client initialization.

:param client: the client to use
:type client: :class:`RiakClient <riak.client.RiakClient>`
:param objs: the Riak Objects to store in parallel
:type keys: list of `RiakObject <riak.riak_object.RiakObject>`
:param options: request options to
:meth:`RiakClient.put <riak.client.RiakClient.put>`
:type options: dict
:rtype: list
"""
outq = Queue()

if 'pool' in options:
pool = options['pool']
del options['pool']
else:
pool = RIAK_MULTIPUT_POOL

pool.start()
for robj in objs:
bucket_type = robj.bucket.bucket_type
bucket = robj.bucket.name
key = robj.key
task = Task(client, outq, bucket_type, bucket, key, robj, options)
pool.enq(task)

results = []
for _ in range(len(objs)):
if pool.stopped():
raise RuntimeError("Multi-put operation interrupted by pool "
"stopping!")
results.append(outq.get())
outq.task_done()

return results
20 changes: 18 additions & 2 deletions riak/client/operations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import riak.client.multi

from riak.client.transport import RiakClientTransport, \
retryable, retryableHttpOnly
from riak.client.multiget import multiget
from riak.client.index_page import IndexPage
from riak.datatypes import TYPES
from riak.table import Table
Expand Down Expand Up @@ -976,7 +977,22 @@ def multiget(self, pairs, **params):
"""
if self._multiget_pool:
params['pool'] = self._multiget_pool
return multiget(self, pairs, **params)
return riak.client.multi.multiget(self, pairs, **params)

def multiput(self, objs, **params):
"""
Stores objects in parallel via threads.

:param objs: the objects to store
:type objs: list of `RiakObject <riak.riak_object.RiakObject>`
:param params: additional request flags, e.g. w, dw, pw
:type params: dict
:rtype: list of boolean or
:class:`RiakObjects <riak.riak_object.RiakObject>`,
"""
if self._multiput_pool:
params['pool'] = self._multiput_pool
return riak.client.multi.multiput(self, objs, **params)

@retryable
def get_counter(self, transport, bucket, key, r=None, pr=None,
Expand Down
Loading