diff --git a/riak/mapreduce.py b/riak/mapreduce.py index 91272dd1..c2b797e0 100644 --- a/riak/mapreduce.py +++ b/riak/mapreduce.py @@ -48,7 +48,7 @@ def __init__(self, client): self._key_filters = [] self._input_mode = None - def add(self, arg1, arg2=None, arg3=None): + def add(self, arg1, arg2=None, arg3=None, bucket_type=None): """ Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can @@ -62,15 +62,17 @@ def add(self, arg1, arg2=None, arg3=None): :type arg2: string, list, None :param arg3: key data for this input (must be convertible to JSON) :type arg3: string, list, dict, None + :param bucket_type: Optional name of a bucket type + :type bucket_type: string, None :rtype: :class:`RiakMapReduce` """ if (arg2 is None) and (arg3 is None): if isinstance(arg1, RiakObject): return self.add_object(arg1) else: - return self.add_bucket(arg1) + return self.add_bucket(arg1, bucket_type) else: - return self.add_bucket_key_data(arg1, arg2, arg3) + return self.add_bucket_key_data(arg1, arg2, arg3, bucket_type) def add_object(self, obj): """ @@ -82,7 +84,7 @@ def add_object(self, obj): """ return self.add_bucket_key_data(obj._bucket._name, obj._key, None) - def add_bucket_key_data(self, bucket, key, data): + def add_bucket_key_data(self, bucket, key, data, bucket_type=None): """ Adds a bucket/key/keydata triple to the inputs. @@ -92,6 +94,8 @@ def add_bucket_key_data(self, bucket, key, data): :type key: string :param data: the key-specific data :type data: string, list, dict, None + :param bucket_type: Optional name of a bucket type + :type bucket_type: string, None :rtype: :class:`RiakMapReduce` """ if self._input_mode == 'bucket': @@ -101,22 +105,40 @@ def add_bucket_key_data(self, bucket, key, data): else: if isinstance(key, Iterable) and \ not isinstance(key, string_types): - for k in key: - self._inputs.append([bucket, k, data]) + if bucket_type is not None: + for k in key: + self._inputs.append([bucket, k, data, bucket_type]) + else: + for k in key: + self._inputs.append([bucket, k, data]) else: - self._inputs.append([bucket, key, data]) + if bucket_type is not None: + self._inputs.append([bucket, key, data, bucket_type]) + else: + self._inputs.append([bucket, key, data]) return self - def add_bucket(self, bucket): + def add_bucket(self, bucket, bucket_type=None): """ Adds all keys in a bucket to the inputs. :param bucket: the bucket :type bucket: string + :param bucket_type: Optional name of a bucket type + :type bucket_type: string, None :rtype: :class:`RiakMapReduce` """ self._input_mode = 'bucket' - self._inputs = bucket + if isinstance(bucket, RiakBucket): + if bucket.bucket_type.is_default(): + self._inputs = {'bucket': bucket.name} + else: + self._inputs = {'bucket': [bucket.bucket_type.name, + bucket.name]} + elif bucket_type is not None and bucket_type != "default": + self._inputs = {'bucket': [bucket_type, bucket]} + else: + self._inputs = {'bucket': bucket} return self def add_key_filters(self, key_filters): @@ -164,7 +186,7 @@ def search(self, index, query): 'query': query} return self - def index(self, bucket, index, startkey, endkey=None): + def index(self, bucket, index, startkey, endkey=None, bucket_type=None): """ Begin a map/reduce operation using a Secondary Index query. @@ -178,6 +200,8 @@ def index(self, bucket, index, startkey, endkey=None): :type startkey: string, integer :param endkey: The end key of index range (if doing a range query) :type endkey: string, integer, None + :param bucket_type: Optional name of a bucket type + :type bucket_type: string, None :rtype: :class:`RiakMapReduce` """ self._input_mode = 'query' @@ -191,6 +215,8 @@ def index(self, bucket, index, startkey, endkey=None): 'index': index, 'start': startkey, 'end': endkey} + if bucket_type is not None: + self._inputs['bucket'] = [bucket_type, bucket] return self def link(self, bucket='_', tag='_', keep=False): @@ -347,16 +373,14 @@ def _normalize_query(self): keep_flag = True query.append(phase.to_array()) - if (len(self._key_filters) > 0): - bucket_name = None - if (type(self._inputs) == str): - bucket_name = self._inputs - elif (type(self._inputs) == RiakBucket): - bucket_name = self._inputs.name - - if (bucket_name is not None): - self._inputs = {'bucket': bucket_name, - 'key_filters': self._key_filters} + # Is this a bucket-only input? If so, we need to switch + # to be a string or array, i.e., no keyword (to keep Riak happy) + # Also add the key filter, if necessary + if isinstance(self._inputs, dict) and len(self._inputs) == 1: + if len(self._key_filters) > 0: + self._inputs['key_filters'] = self._key_filters + else: + self._inputs = self._inputs['bucket'] return query, link_results_flag @@ -673,15 +697,24 @@ class RiakMapReduceChain(object): Mixin to add chaining from the client object directly into a MapReduce operation. """ - def add(self, *args): + def add(self, arg1, arg2=None, arg3=None, bucket_type=None): """ Start assembling a Map/Reduce operation. A shortcut for :func:`RiakMapReduce.add`. + :param arg1: the object or bucket to add + :type arg1: RiakObject, string + :param arg2: a key or list of keys to add (if a bucket is + given in arg1) + :type arg2: string, list, None + :param arg3: key data for this input (must be convertible to JSON) + :type arg3: string, list, dict, None + :param bucket_type: Optional name of a bucket type + :type bucket_type: string, None :rtype: :class:`RiakMapReduce` """ mr = RiakMapReduce(self) - return mr.add(*args) + return mr.add(arg1, arg2, arg3, bucket_type) def search(self, *args): """ @@ -692,18 +725,31 @@ def search(self, *args): :rtype: :class:`RiakMapReduce` """ + mr = RiakMapReduce(self) return mr.search(*args) - def index(self, *args): + def index(self, bucket, index, startkey, endkey=None, bucket_type=None): """ Start assembling a Map/Reduce operation based on secondary index query results. + :param bucket: The bucket over which to perform the query + :type bucket: string + :param index: The index to use for query + :type index: string + :param startkey: The start key of index range, or the + value which all entries must equal + :type startkey: string, integer + :param endkey: The end key of index range (if doing a range query) + :type endkey: string, integer, None + :param bucket_type: Optional name of a bucket type + :type bucket_type: string, None :rtype: :class:`RiakMapReduce` """ + mr = RiakMapReduce(self) - return mr.index(*args) + return mr.index(bucket, index, startkey, endkey, bucket_type) def link(self, *args): """ diff --git a/riak/riak_object.py b/riak/riak_object.py index 98053bfe..800822d9 100644 --- a/riak/riak_object.py +++ b/riak/riak_object.py @@ -358,16 +358,25 @@ def clear(self): self.siblings = [] return self - def add(self, *args): + def add(self, arg1, arg2=None, arg3=None, bucket_type=None): """ Start assembling a Map/Reduce operation. A shortcut for :meth:`~riak.mapreduce.RiakMapReduce.add`. + :param arg1: the object or bucket to add + :type arg1: RiakObject, string + :param arg2: a key or list of keys to add (if a bucket is + given in arg1) + :type arg2: string, list, None + :param arg3: key data for this input (must be convertible to JSON) + :type arg3: string, list, dict, None + :param bucket_type: Optional name of a bucket type + :type bucket_type: string, None :rtype: :class:`~riak.mapreduce.RiakMapReduce` """ mr = RiakMapReduce(self.client) - mr.add(self.bucket.name, self.key) - return mr.add(*args) + mr.add(self.bucket.name, self.key, bucket_type=bucket_type) + return mr.add(arg1, arg2, arg3, bucket_type) def link(self, *args): """ diff --git a/riak/tests/test_mapreduce.py b/riak/tests/test_mapreduce.py index 7881906f..f22a24f6 100644 --- a/riak/tests/test_mapreduce.py +++ b/riak/tests/test_mapreduce.py @@ -98,6 +98,23 @@ def test_erlang_map_reduce(self): .run() self.assertEqual(len(result), 2) + def test_erlang_map_reduce_bucket_type(self): + # Create the object... + btype = self.client.bucket_type("pytest") + bucket = btype.bucket(self.bucket_name) + bucket.new("foo", 2).store() + bucket.new("bar", 2).store() + bucket.new("baz", 4).store() + # Run the map... + result = self.client \ + .add(self.bucket_name, "foo", bucket_type="pytest") \ + .add(self.bucket_name, "bar", bucket_type="pytest") \ + .add(self.bucket_name, "baz", bucket_type="pytest") \ + .map(["riak_kv_mapreduce", "map_object_value"]) \ + .reduce(["riak_kv_mapreduce", "reduce_set_union"]) \ + .run() + self.assertEqual(len(result), 2) + def test_erlang_source_map_reduce(self): # Create the object... bucket = self.client.bucket(self.bucket_name) @@ -124,6 +141,30 @@ def test_erlang_source_map_reduce(self): if strfun_allowed: self.assertEqual(result, ['2', '3', '4']) + def test_erlang_source_map_reduce_bucket_type(self): + # Create the object... + btype = self.client.bucket_type("pytest") + bucket = btype.bucket(self.bucket_name) + bucket.new("foo", 2).store() + bucket.new("bar", 3).store() + bucket.new("baz", 4).store() + strfun_allowed = True + # Run the map... + try: + result = self.client \ + .add(self.bucket_name, "foo", bucket_type="pytest") \ + .add(self.bucket_name, "bar", bucket_type="pytest") \ + .add(self.bucket_name, "baz", bucket_type="pytest") \ + .map("""fun(Object, _KD, _A) -> + Value = riak_object:get_value(Object), + [Value] + end.""", {'language': 'erlang'}).run() + except RiakError as e: + if e.value.startswith('May have tried'): + strfun_allowed = False + if strfun_allowed: + self.assertEqual(result, ['2', '3', '4']) + def test_client_exceptional_paths(self): bucket = self.client.bucket(self.bucket_name) bucket.new("foo", 2).store() @@ -195,6 +236,18 @@ def test_javascript_named_map(self): .run() self.assertEqual(result, [2]) + def test_javascript_named_map_bucket_type(self): + # Create the object... + btype = self.client.bucket_type("pytest") + bucket = btype.bucket(self.bucket_name) + bucket.new("foo", 2).store() + # Run the map... + result = self.client \ + .add(self.bucket_name, "foo", bucket_type="pytest") \ + .map("Riak.mapValuesJson") \ + .run() + self.assertEqual(result, [2]) + def test_javascript_source_map_reduce(self): # Create the object... bucket = self.client.bucket(self.bucket_name) @@ -211,6 +264,23 @@ def test_javascript_source_map_reduce(self): .run() self.assertEqual(result, [3]) + def test_javascript_source_map_reduce_bucket_type(self): + # Create the object... + btype = self.client.bucket_type("pytest") + bucket = btype.bucket(self.bucket_name) + bucket.new("foo", 2).store() + bucket.new("bar", 3).store() + bucket.new("baz", 4).store() + # Run the map... + result = self.client \ + .add(self.bucket_name, "foo", bucket_type="pytest") \ + .add(self.bucket_name, "bar", bucket_type="pytest") \ + .add(self.bucket_name, "baz", bucket_type="pytest") \ + .map("function (v) { return [1]; }") \ + .reduce("Riak.reduceSum") \ + .run() + self.assertEqual(result, [3]) + def test_javascript_named_map_reduce(self): # Create the object... bucket = self.client.bucket(self.bucket_name) @@ -227,6 +297,23 @@ def test_javascript_named_map_reduce(self): .run() self.assertEqual(result, [9]) + def test_javascript_named_map_reduce_bucket_type(self): + # Create the object... + btype = self.client.bucket_type("pytest") + bucket = btype.bucket(self.bucket_name) + bucket.new("foo", 2).store() + bucket.new("bar", 3).store() + bucket.new("baz", 4).store() + # Run the map... + result = self.client \ + .add(self.bucket_name, "foo", bucket_type="pytest") \ + .add(self.bucket_name, "bar", bucket_type="pytest") \ + .add(self.bucket_name, "baz", bucket_type="pytest") \ + .map("Riak.mapValuesJson") \ + .reduce("Riak.reduceSum") \ + .run() + self.assertEqual(result, [9]) + def test_javascript_bucket_map_reduce(self): # Create the object... bucket = self.client.bucket("bucket_%s" % self.randint()) @@ -241,6 +328,21 @@ def test_javascript_bucket_map_reduce(self): .run() self.assertEqual(result, [9]) + def test_javascript_bucket_map_reduceP_bucket_type(self): + # Create the object... + btype = self.client.bucket_type("pytest") + bucket = btype.bucket("bucket_%s" % self.randint()) + bucket.new("foo", 2).store() + bucket.new("bar", 3).store() + bucket.new("baz", 4).store() + # Run the map... + result = self.client \ + .add(bucket.name, bucket_type="pytest") \ + .map("Riak.mapValuesJson") \ + .reduce("Riak.reduceSum") \ + .run() + self.assertEqual(result, [9]) + def test_javascript_arg_map_reduce(self): # Create the object... bucket = self.client.bucket(self.bucket_name) @@ -257,6 +359,23 @@ def test_javascript_arg_map_reduce(self): .run() self.assertEqual(result, [10]) + def test_javascript_arg_map_reduce_bucket_type(self): + # Create the object... + btype = self.client.bucket_type("pytest") + bucket = btype.bucket(self.bucket_name) + bucket.new("foo", 2).store() + # Run the map... + result = self.client \ + .add(self.bucket_name, "foo", 5, bucket_type="pytest") \ + .add(self.bucket_name, "foo", 10, bucket_type="pytest") \ + .add(self.bucket_name, "foo", 15, bucket_type="pytest") \ + .add(self.bucket_name, "foo", -15, bucket_type="pytest") \ + .add(self.bucket_name, "foo", -5, bucket_type="pytest") \ + .map("function(v, arg) { return [arg]; }") \ + .reduce("Riak.reduceSum") \ + .run() + self.assertEqual(result, [10]) + def test_key_filters(self): bucket = self.client.bucket("kftest") bucket.new("basho-20101215", 1).store() @@ -272,6 +391,22 @@ def test_key_filters(self): self.assertEqual(result, ["yahoo-20090613"]) + def test_key_filters_bucket_type(self): + btype = self.client.bucket_type("pytest") + bucket = btype.bucket("kftest") + bucket.new("basho-20101215", 1).store() + bucket.new("google-20110103", 2).store() + bucket.new("yahoo-20090613", 3).store() + + result = self.client \ + .add("kftest", bucket_type="pytest") \ + .add_key_filters([["tokenize", "-", 2]]) \ + .add_key_filter("ends_with", "0613") \ + .map("function (v, keydata) { return [v.key]; }") \ + .run() + + self.assertEqual(result, ["yahoo-20090613"]) + def test_key_filters_f_chain(self): bucket = self.client.bucket("kftest") bucket.new("basho-20101215", 1).store()