Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 70 additions & 24 deletions riak/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, client):
self._key_filters = []
self._input_mode = None

def add(self, arg1, arg2=None, arg3=None):
def add(self, arg1, arg2=None, arg3=None, bucket_type=None):
"""
Add inputs to a map/reduce operation. This method takes three
different forms, depending on the provided inputs. You can
Expand All @@ -62,15 +62,17 @@ def add(self, arg1, arg2=None, arg3=None):
:type arg2: string, list, None
:param arg3: key data for this input (must be convertible to JSON)
:type arg3: string, list, dict, None
:param bucket_type: Optional name of a bucket type
:type bucket_type: string, None
:rtype: :class:`RiakMapReduce`
"""
if (arg2 is None) and (arg3 is None):
if isinstance(arg1, RiakObject):
return self.add_object(arg1)
else:
return self.add_bucket(arg1)
return self.add_bucket(arg1, bucket_type)
else:
return self.add_bucket_key_data(arg1, arg2, arg3)
return self.add_bucket_key_data(arg1, arg2, arg3, bucket_type)

def add_object(self, obj):
"""
Expand All @@ -82,7 +84,7 @@ def add_object(self, obj):
"""
return self.add_bucket_key_data(obj._bucket._name, obj._key, None)

def add_bucket_key_data(self, bucket, key, data):
def add_bucket_key_data(self, bucket, key, data, bucket_type=None):
"""
Adds a bucket/key/keydata triple to the inputs.

Expand All @@ -92,6 +94,8 @@ def add_bucket_key_data(self, bucket, key, data):
:type key: string
:param data: the key-specific data
:type data: string, list, dict, None
:param bucket_type: Optional name of a bucket type
:type bucket_type: string, None
:rtype: :class:`RiakMapReduce`
"""
if self._input_mode == 'bucket':
Expand All @@ -101,22 +105,40 @@ def add_bucket_key_data(self, bucket, key, data):
else:
if isinstance(key, Iterable) and \
not isinstance(key, string_types):
for k in key:
self._inputs.append([bucket, k, data])
if bucket_type is not None:
for k in key:
self._inputs.append([bucket, k, data, bucket_type])
else:
for k in key:
self._inputs.append([bucket, k, data])
else:
self._inputs.append([bucket, key, data])
if bucket_type is not None:
self._inputs.append([bucket, key, data, bucket_type])
else:
self._inputs.append([bucket, key, data])
return self

def add_bucket(self, bucket):
def add_bucket(self, bucket, bucket_type=None):
"""
Adds all keys in a bucket to the inputs.

:param bucket: the bucket
:type bucket: string
:param bucket_type: Optional name of a bucket type
:type bucket_type: string, None
:rtype: :class:`RiakMapReduce`
"""
self._input_mode = 'bucket'
self._inputs = bucket
if isinstance(bucket, RiakBucket):
if bucket.bucket_type.is_default():
self._inputs = {'bucket': bucket.name}
else:
self._inputs = {'bucket': [bucket.bucket_type.name,
bucket.name]}
elif bucket_type is not None and bucket_type != "default":
self._inputs = {'bucket': [bucket_type, bucket]}
else:
self._inputs = {'bucket': bucket}
return self

def add_key_filters(self, key_filters):
Expand Down Expand Up @@ -164,7 +186,7 @@ def search(self, index, query):
'query': query}
return self

def index(self, bucket, index, startkey, endkey=None):
def index(self, bucket, index, startkey, endkey=None, bucket_type=None):
"""
Begin a map/reduce operation using a Secondary Index
query.
Expand All @@ -178,6 +200,8 @@ def index(self, bucket, index, startkey, endkey=None):
:type startkey: string, integer
:param endkey: The end key of index range (if doing a range query)
:type endkey: string, integer, None
:param bucket_type: Optional name of a bucket type
:type bucket_type: string, None
:rtype: :class:`RiakMapReduce`
"""
self._input_mode = 'query'
Expand All @@ -191,6 +215,8 @@ def index(self, bucket, index, startkey, endkey=None):
'index': index,
'start': startkey,
'end': endkey}
if bucket_type is not None:
self._inputs['bucket'] = [bucket_type, bucket]
return self

def link(self, bucket='_', tag='_', keep=False):
Expand Down Expand Up @@ -347,16 +373,14 @@ def _normalize_query(self):
keep_flag = True
query.append(phase.to_array())

if (len(self._key_filters) > 0):
bucket_name = None
if (type(self._inputs) == str):
bucket_name = self._inputs
elif (type(self._inputs) == RiakBucket):
bucket_name = self._inputs.name

if (bucket_name is not None):
self._inputs = {'bucket': bucket_name,
'key_filters': self._key_filters}
# Is this a bucket-only input? If so, we need to switch
# to be a string or array, i.e., no keyword (to keep Riak happy)
# Also add the key filter, if necessary
if isinstance(self._inputs, dict) and len(self._inputs) == 1:
if len(self._key_filters) > 0:
self._inputs['key_filters'] = self._key_filters
else:
self._inputs = self._inputs['bucket']

return query, link_results_flag

Expand Down Expand Up @@ -673,15 +697,24 @@ class RiakMapReduceChain(object):
Mixin to add chaining from the client object directly into a
MapReduce operation.
"""
def add(self, *args):
def add(self, arg1, arg2=None, arg3=None, bucket_type=None):
"""
Start assembling a Map/Reduce operation. A shortcut for
:func:`RiakMapReduce.add`.

:param arg1: the object or bucket to add
:type arg1: RiakObject, string
:param arg2: a key or list of keys to add (if a bucket is
given in arg1)
:type arg2: string, list, None
:param arg3: key data for this input (must be convertible to JSON)
:type arg3: string, list, dict, None
:param bucket_type: Optional name of a bucket type
:type bucket_type: string, None
:rtype: :class:`RiakMapReduce`
"""
mr = RiakMapReduce(self)
return mr.add(*args)
return mr.add(arg1, arg2, arg3, bucket_type)

def search(self, *args):
"""
Expand All @@ -692,18 +725,31 @@ def search(self, *args):

:rtype: :class:`RiakMapReduce`
"""

mr = RiakMapReduce(self)
return mr.search(*args)

def index(self, *args):
def index(self, bucket, index, startkey, endkey=None, bucket_type=None):
"""
Start assembling a Map/Reduce operation based on secondary
index query results.

:param bucket: The bucket over which to perform the query
:type bucket: string
:param index: The index to use for query
:type index: string
:param startkey: The start key of index range, or the
value which all entries must equal
:type startkey: string, integer
:param endkey: The end key of index range (if doing a range query)
:type endkey: string, integer, None
:param bucket_type: Optional name of a bucket type
:type bucket_type: string, None
:rtype: :class:`RiakMapReduce`
"""

mr = RiakMapReduce(self)
return mr.index(*args)
return mr.index(bucket, index, startkey, endkey, bucket_type)

def link(self, *args):
"""
Expand Down
15 changes: 12 additions & 3 deletions riak/riak_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,16 +358,25 @@ def clear(self):
self.siblings = []
return self

def add(self, *args):
def add(self, arg1, arg2=None, arg3=None, bucket_type=None):
"""
Start assembling a Map/Reduce operation.
A shortcut for :meth:`~riak.mapreduce.RiakMapReduce.add`.

:param arg1: the object or bucket to add
:type arg1: RiakObject, string
:param arg2: a key or list of keys to add (if a bucket is
given in arg1)
:type arg2: string, list, None
:param arg3: key data for this input (must be convertible to JSON)
:type arg3: string, list, dict, None
:param bucket_type: Optional name of a bucket type
:type bucket_type: string, None
:rtype: :class:`~riak.mapreduce.RiakMapReduce`
"""
mr = RiakMapReduce(self.client)
mr.add(self.bucket.name, self.key)
return mr.add(*args)
mr.add(self.bucket.name, self.key, bucket_type=bucket_type)
return mr.add(arg1, arg2, arg3, bucket_type)

def link(self, *args):
"""
Expand Down
Loading