From 2ed365579d683a46a575c513d433584d59589f8a Mon Sep 17 00:00:00 2001 From: iindyk Date: Wed, 29 Jul 2020 22:10:59 -0400 Subject: [PATCH 1/9] Extending ApproximateQuantiles functionality to deal with non-uniform weights. --- sdks/python/apache_beam/transforms/stats.py | 262 +++++++++++++----- .../apache_beam/transforms/stats_test.py | 61 +++- 2 files changed, 251 insertions(+), 72 deletions(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index fd51f7ab7a38..0e44d8f724be 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -244,17 +244,19 @@ class ApproximateQuantiles(object): (e.g. quartiles, percentiles etc.) either globally or per-key. """ @staticmethod - def _display_data(num_quantiles, key, reverse): + def _display_data(num_quantiles, weighted, key, reverse): return { - 'num_quantiles': DisplayDataItem(num_quantiles, label="Quantile Count"), + 'num_quantiles': DisplayDataItem(num_quantiles, label='Quantile Count'), + 'weighted': DisplayDataItem(str(weighted), label='Is Weighted'), 'key': DisplayDataItem( key.__name__ if hasattr(key, '__name__') else key.__class__.__name__, label='Record Comparer Key'), - 'reverse': DisplayDataItem(str(reverse), label='Is reversed') + 'reverse': DisplayDataItem(str(reverse), label='Is Reversed') } - @typehints.with_input_types(T) + @typehints.with_input_types( + typehints.Union[typing.Sequence[T], typing.Tuple[T, float]]) @typehints.with_output_types(typing.List[T]) class Globally(PTransform): """ @@ -263,13 +265,17 @@ class Globally(PTransform): Args: num_quantiles: number of elements in the resulting quantiles values list. + weighted: (optional) if set to True, the transform returns weighted + quantiles. The input PCollection is then expected to contain tuples of + input values with the corresponding weight. key: (optional) Key is a mapping of elements to a comparable key, similar to the key argument of Python's sorting methods. reverse: (optional) whether to order things smallest to largest, rather than largest to smallest """ - def __init__(self, num_quantiles, key=None, reverse=False): + def __init__(self, num_quantiles, weighted=False, key=None, reverse=False): self._num_quantiles = num_quantiles + self._weighted = weighted self._key = key self._reverse = reverse @@ -277,16 +283,20 @@ def expand(self, pcoll): return pcoll | CombineGlobally( ApproximateQuantilesCombineFn.create( num_quantiles=self._num_quantiles, + weighted=self._weighted, key=self._key, reverse=self._reverse)) def display_data(self): return ApproximateQuantiles._display_data( num_quantiles=self._num_quantiles, + weighted=self._weighted, key=self._key, reverse=self._reverse) - @typehints.with_input_types(typing.Tuple[K, V]) + @typehints.with_input_types( + typehints.Union[typing.Tuple[K, V], + typing.Tuple[K, typing.Tuple[V, float]]]) @typehints.with_output_types(typing.Tuple[K, typing.List[V]]) class PerKey(PTransform): """ @@ -296,13 +306,17 @@ class PerKey(PTransform): Args: num_quantiles: number of elements in the resulting quantiles values list. + weighted: (optional) if set to True, the transform returns weighted + quantiles. The input PCollection is then expected to contain tuples of + input values with the corresponding weight. key: (optional) Key is a mapping of elements to a comparable key, similar to the key argument of Python's sorting methods. reverse: (optional) whether to order things smallest to largest, rather than largest to smallest """ - def __init__(self, num_quantiles, key=None, reverse=False): + def __init__(self, num_quantiles, weighted=False, key=None, reverse=False): self._num_quantiles = num_quantiles + self._weighted = weighted self._key = key self._reverse = reverse @@ -310,12 +324,14 @@ def expand(self, pcoll): return pcoll | CombinePerKey( ApproximateQuantilesCombineFn.create( num_quantiles=self._num_quantiles, + weighted=self._weighted, key=self._key, reverse=self._reverse)) def display_data(self): return ApproximateQuantiles._display_data( num_quantiles=self._num_quantiles, + weighted=self._weighted, key=self._key, reverse=self._reverse) @@ -324,30 +340,40 @@ class _QuantileBuffer(object): """A single buffer in the sense of the referenced algorithm. (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1 &type=pdf and ApproximateQuantilesCombineFn for further information)""" - def __init__(self, elements, level=0, weight=1): + def __init__(self, elements, weighted, level=0, weight=1): + # In case of weighted quantiles, elements are tuples of values and weights. self.elements = elements + self.weighted = weighted self.level = level self.weight = weight def __lt__(self, other): - self.elements < other.elements + if self.weighted: + [element[0] for element in self.elements + ] < [element[0] for element in other.elements] + else: + self.elements < other.elements def sized_iterator(self): class QuantileBufferIterator(object): - def __init__(self, elem, weight): + def __init__(self, elem, weighted, weight): self._iter = iter(elem) + self.weighted = weighted self.weight = weight def __iter__(self): return self def __next__(self): - value = next(self._iter) - return (value, self.weight) + if self.weighted: + return next(self._iter) + else: + value = next(self._iter) + return (value, self.weight) next = __next__ # For Python 2 - return QuantileBufferIterator(self.elements, self.weight) + return QuantileBufferIterator(self.elements, self.weighted, self.weight) class _QuantileState(object): @@ -398,8 +424,8 @@ class ApproximateQuantilesCombineFn(CombineFn): http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1 &type=pdf - The default error bound is (1 / N), though in practice the accuracy - tends to be much better. + The default error bound is (1 / N) for uniformly distributed data and 1e-2 for + weighted case, though in practice the accuracy tends to be much better. Args: num_quantiles: Number of quantiles to produce. It is the size of the final @@ -408,6 +434,9 @@ class ApproximateQuantilesCombineFn(CombineFn): paper. num_buffers: The number of buffers, corresponding to b in the referenced paper. + weighted: (optional) if set to True, the combiner produces weighted + quantiles. The input elements are then expected to be tuples of input + values with the corresponding weight. key: (optional) Key is a mapping of elements to a comparable key, similar to the key argument of Python's sorting methods. reverse: (optional) whether to order things smallest to largest, rather @@ -428,7 +457,13 @@ class ApproximateQuantilesCombineFn(CombineFn): _qs = None # Refers to the _QuantileState def __init__( - self, num_quantiles, buffer_size, num_buffers, key=None, reverse=False): + self, + num_quantiles, + buffer_size, + num_buffers, + weighted=False, + key=None, + reverse=False): def _comparator(a, b): if key: a, b = key(a), key(b) @@ -445,13 +480,18 @@ def _comparator(a, b): self._num_quantiles = num_quantiles self._buffer_size = buffer_size self._num_buffers = num_buffers - self._key = key + self._weighted = weighted + if weighted: + self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0])) + else: + self._key = key self._reverse = reverse @classmethod def create( cls, num_quantiles, + weighted=False, epsilon=None, max_num_elements=None, key=None, @@ -463,6 +503,10 @@ def create( Args: num_quantiles: Number of quantiles to produce. It is the size of the final output list, including the mininum and maximum value items. + + weighted: (optional) if set to True, the combiner produces weighted + quantiles. The input elements are then expected to be tuples of values + with the corresponding weight. epsilon: (optional) The default error bound is `epsilon`, which holds as long as the number of elements is less than `_MAX_NUM_ELEMENTS`. Specifically, if one considers the input as a sorted list x_1, ..., @@ -480,7 +524,7 @@ def create( """ max_num_elements = max_num_elements or cls._MAX_NUM_ELEMENTS if not epsilon: - epsilon = 1.0 / num_quantiles + epsilon = 1e-2 if weighted else 1.0 / num_quantiles b = 2 while (b - 2) * (1 << (b - 2)) < epsilon * max_num_elements: b = b + 1 @@ -490,32 +534,41 @@ def create( num_quantiles=num_quantiles, buffer_size=k, num_buffers=b, + weighted=weighted, key=key, reverse=reverse) - def _add_unbuffered(self, qs, elem): + def _add_unbuffered(self, qs, elements): """ Add a new buffer to the unbuffered list, creating a new buffer and collapsing if needed. """ - qs.unbuffered_elements.append(elem) - if len(qs.unbuffered_elements) == qs.buffer_size: + qs.unbuffered_elements.extend(elements) + if len(qs.unbuffered_elements) >= qs.buffer_size: qs.unbuffered_elements.sort(key=self._key, reverse=self._reverse) - heapq.heappush( - qs.buffers, _QuantileBuffer(elements=qs.unbuffered_elements)) - qs.unbuffered_elements = [] - self._collapse_if_needed(qs) - def _offset(self, newWeight): + while len(qs.unbuffered_elements) >= qs.buffer_size: + to_buffer = qs.unbuffered_elements[:qs.buffer_size] + heapq.heappush( + qs.buffers, + _QuantileBuffer( + elements=to_buffer, + weighted=self._weighted, + weight=sum([element[1] for element in to_buffer]) + if self._weighted else 1)) + qs.unbuffered_elements = qs.unbuffered_elements[qs.buffer_size:] + self._collapse_if_needed(qs) + + def _offset(self, new_weight): """ If the weight is even, we must round up or down. Alternate between these two options to avoid a bias. """ - if newWeight % 2 == 1: - return (newWeight + 1) / 2 + if new_weight % 2 == 1: + return (new_weight + 1) / 2 else: self._offset_jitter = 2 - self._offset_jitter - return (newWeight + self._offset_jitter) / 2 + return (new_weight + self._offset_jitter) / 2 def _collapse(self, buffers): new_level = 0 @@ -527,21 +580,26 @@ def _collapse(self, buffers): # computed shards. If they differ we take the max. new_level = max([new_level, buffer_elem.level + 1]) new_weight = new_weight + buffer_elem.weight - new_elements = self._interpolate( - buffers, self._buffer_size, new_weight, self._offset(new_weight)) - return _QuantileBuffer(new_elements, new_level, new_weight) + if self._weighted: + step = new_weight / (self._buffer_size - 1) + offset = new_weight / (2 * self._buffer_size) + else: + step = new_weight + offset = self._offset(new_weight) + new_elements = self._interpolate(buffers, self._buffer_size, step, offset) + return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight) def _collapse_if_needed(self, qs): while len(qs.buffers) > self._num_buffers: - toCollapse = [] - toCollapse.append(heapq.heappop(qs.buffers)) - toCollapse.append(heapq.heappop(qs.buffers)) - minLevel = toCollapse[1].level + to_collapse = [] + to_collapse.append(heapq.heappop(qs.buffers)) + to_collapse.append(heapq.heappop(qs.buffers)) + min_level = to_collapse[1].level - while len(qs.buffers) > 0 and qs.buffers[0].level == minLevel: - toCollapse.append(heapq.heappop(qs.buffers)) + while len(qs.buffers) > 0 and qs.buffers[0].level == min_level: + to_collapse.append(heapq.heappop(qs.buffers)) - heapq.heappush(qs.buffers, self._collapse(toCollapse)) + heapq.heappush(qs.buffers, self._collapse(to_collapse)) def _interpolate(self, i_buffers, count, step, offset): """ @@ -552,8 +610,8 @@ def _interpolate(self, i_buffers, count, step, offset): iterators = [] new_elements = [] - compare_key = None - if self._key: + compare_key = self._key + if self._key and not self._weighted: compare_key = lambda x: self._key(x[0]) for buffer_elem in i_buffers: iterators.append(buffer_elem.sized_iterator()) @@ -576,17 +634,34 @@ def _interpolate(self, i_buffers, count, step, offset): weighted_element = next(sorted_elem) current = weighted_element[1] j = 0 - while j < count: - target = j * step + offset - j = j + 1 - try: - while current <= target: - weighted_element = next(sorted_elem) - current = current + weighted_element[1] - except StopIteration: - pass - new_elements.append(weighted_element[0]) - return new_elements + if self._weighted: + previous = 0 + new_weights = [] + while j < count: + target = j * step + offset + j = j + 1 + try: + while current <= target: + weighted_element = next(sorted_elem) + current = current + weighted_element[1] + except StopIteration: + pass + new_elements.append(weighted_element[0]) + new_weights.append(current - previous) + previous = current + return list(zip(new_elements, new_weights)) + else: + while j < count: + target = j * step + offset + j = j + 1 + try: + while current <= target: + weighted_element = next(sorted_elem) + current = current + weighted_element[1] + except StopIteration: + pass + new_elements.append(weighted_element[0]) + return new_elements def create_accumulator(self): self._qs = _QuantileState( @@ -600,13 +675,35 @@ def add_input(self, quantile_state, element): """ Add a new element to the collection being summarized by quantile state. """ + value = element[0] if self._weighted else element + if quantile_state.is_empty(): + quantile_state.min_val = quantile_state.max_val = value + elif self._comparator(value, quantile_state.min_val) < 0: + quantile_state.min_val = value + elif self._comparator(value, quantile_state.max_val) > 0: + quantile_state.max_val = value + self._add_unbuffered(quantile_state, elements=[element]) + return quantile_state + + def add_inputs(self, quantile_state, elements): + """Add new elements to the collection being summarized by quantile state. + """ + if not elements: + return quantile_state + + values = [ + element[0] for element in elements + ] if self._weighted else elements + min_val = min(values) + max_val = max(values) if quantile_state.is_empty(): - quantile_state.min_val = quantile_state.max_val = element - elif self._comparator(element, quantile_state.min_val) < 0: - quantile_state.min_val = element - elif self._comparator(element, quantile_state.max_val) > 0: - quantile_state.max_val = element - self._add_unbuffered(quantile_state, elem=element) + quantile_state.min_val = min_val + quantile_state.max_val = max_val + elif self._comparator(min_val, quantile_state.min_val) < 0: + quantile_state.min_val = min_val + elif self._comparator(max_val, quantile_state.max_val) > 0: + quantile_state.max_val = max_val + self._add_unbuffered(quantile_state, elements=elements) return quantile_state def merge_accumulators(self, accumulators): @@ -622,8 +719,7 @@ def merge_accumulators(self, accumulators): qs.max_val) > 0: qs.max_val = accumulator.max_val - for unbuffered_element in accumulator.unbuffered_elements: - self._add_unbuffered(qs, unbuffered_element) + self._add_unbuffered(qs, accumulator.unbuffered_elements) qs.buffers.extend(accumulator.buffers) self._collapse_if_needed(qs) @@ -639,18 +735,44 @@ def extract_output(self, accumulator): return [] all_elems = accumulator.buffers - total_count = len(accumulator.unbuffered_elements) - for buffer_elem in all_elems: - total_count = total_count + accumulator.buffer_size * buffer_elem.weight + if self._weighted: + unbuffered_weight = sum( + [element[1] for element in accumulator.unbuffered_elements]) + total_weight = unbuffered_weight + for buffer_elem in all_elems: + total_weight += sum([element[1] for element in buffer_elem.elements]) + if accumulator.unbuffered_elements: + accumulator.unbuffered_elements.sort( + key=self._key, reverse=self._reverse) + all_elems.append( + _QuantileBuffer( + accumulator.unbuffered_elements, + weighted=True, + weight=unbuffered_weight)) + + step = 1.0 * total_weight / (self._num_quantiles - 1) + offset = (1.0 * total_weight) / (self._num_quantiles - 1) + mid_quantiles = [ + element[0] for element in self._interpolate( + all_elems, self._num_quantiles - 2, step, offset) + ] + else: + total_weight = len(accumulator.unbuffered_elements) + for buffer_elem in all_elems: + total_weight += accumulator.buffer_size * buffer_elem.weight + + if accumulator.unbuffered_elements: + accumulator.unbuffered_elements.sort( + key=self._key, reverse=self._reverse) + all_elems.append( + _QuantileBuffer(accumulator.unbuffered_elements, weighted=False)) - if accumulator.unbuffered_elements: - accumulator.unbuffered_elements.sort(key=self._key, reverse=self._reverse) - all_elems.append(_QuantileBuffer(accumulator.unbuffered_elements)) + step = 1.0 * total_weight / (self._num_quantiles - 1) + offset = (1.0 * total_weight - 1) / (self._num_quantiles - 1) + mid_quantiles = self._interpolate( + all_elems, self._num_quantiles - 2, step, offset) - step = 1.0 * total_count / (self._num_quantiles - 1) - offset = (1.0 * total_count - 1) / (self._num_quantiles - 1) quantiles = [accumulator.min_val] - quantiles.extend( - self._interpolate(all_elems, self._num_quantiles - 2, step, offset)) + quantiles.extend(mid_quantiles) quantiles.append(accumulator.max_val) return quantiles diff --git a/sdks/python/apache_beam/transforms/stats_test.py b/sdks/python/apache_beam/transforms/stats_test.py index ccaaa6892473..e4d23ed96634 100644 --- a/sdks/python/apache_beam/transforms/stats_test.py +++ b/sdks/python/apache_beam/transforms/stats_test.py @@ -550,6 +550,41 @@ def test_quantiles_globaly(self): equal_to([[100, 75, 50, 25, 0]]), label='checkReversedQuantiles') + def test_quantiles_globally_weighted(self): + num_inputs = 1e3 + a = -3 + b = 3 + + # Weighting function coincides with the pdf of the standard normal + # distribution up to a constant. Since 99.7% of the probability mass for + # this pdf is concentrated in the interval [a, b] = [-3, 3], the quantiles + # for a sample from this interval with the given weight function are + # expected to be close to the quantiles of the standard normal distribution. + def weight(x): + return math.exp(-(x**2) / 2) + + input_data = [ + (a + (b - a) * i / num_inputs, weight(a + (b - a) * i / num_inputs)) + for i in range(int(num_inputs) + 1) + ] + with TestPipeline() as p: + pc = p | Create(input_data) + + weighted_quantiles = pc | "Quantiles globally weighted" >> \ + beam.ApproximateQuantiles.Globally(5, weighted=True) + reversed_weighted_quantiles = ( + pc | 'Quantiles globally weighted reversed' >> + beam.ApproximateQuantiles.Globally(5, weighted=True, reverse=True)) + + assert_that( + weighted_quantiles, + equal_to([[-3., -0.6720000000000002, 0., 0.6720000000000002, 3.]]), + label="checkWeightedQuantilesGlobally") + assert_that( + reversed_weighted_quantiles, + equal_to([[3., 0.6720000000000002, 0., -0.6720000000000002, -3.]]), + label="checkWeightedReversedQuantilesGlobally") + def test_quantiles_per_key(self): with TestPipeline() as p: data = self._kv_data @@ -569,6 +604,25 @@ def test_quantiles_per_key(self): equal_to([('a', [3, 1]), ('b', [100, 1])]), label='checkReversedQuantilesPerKey') + def test_quantiles_per_key_weighted(self): + with TestPipeline() as p: + data = [(k, (v, 2.)) for k, v in self._kv_data] + pc = p | Create(data) + + per_key = pc | 'Weighted Quantiles PerKey' >> \ + beam.ApproximateQuantiles.PerKey(2, weighted=True) + per_key_reversed = pc | 'Weighted Quantiles PerKey Reversed' >> \ + beam.ApproximateQuantiles.PerKey(2, weighted=True, reverse=True) + + assert_that( + per_key, + equal_to([('a', [1, 3]), ('b', [1, 100])]), + label='checkWeightedQuantilesPerKey') + assert_that( + per_key_reversed, + equal_to([('a', [3, 1]), ('b', [100, 1])]), + label='checkWeightedReversedQuantilesPerKey') + def test_quantiles_per_key_with_key_argument(self): with TestPipeline() as p: data = self._kv_str_data @@ -695,19 +749,22 @@ def test_alternate_quantiles(self): def _display_data_matcher(instance): expected_items = [ DisplayDataItemMatcher('num_quantiles', instance._num_quantiles), + DisplayDataItemMatcher('weighted', str(instance._weighted)), DisplayDataItemMatcher('key', str(instance._key.__name__)), DisplayDataItemMatcher('reverse', str(instance._reverse)) ] return expected_items def test_global_display_data(self): - transform = beam.ApproximateQuantiles.Globally(3, key=len, reverse=True) + transform = beam.ApproximateQuantiles.Globally( + 3, weighted=True, key=len, reverse=True) data = DisplayData.create_from(transform) expected_items = self._display_data_matcher(transform) hc.assert_that(data.items, hc.contains_inanyorder(*expected_items)) def test_perkey_display_data(self): - transform = beam.ApproximateQuantiles.PerKey(3, key=len, reverse=True) + transform = beam.ApproximateQuantiles.PerKey( + 3, weighted=True, key=len, reverse=True) data = DisplayData.create_from(transform) expected_items = self._display_data_matcher(transform) hc.assert_that(data.items, hc.contains_inanyorder(*expected_items)) From 5f0a736fd3c552c6fa61e5c20cbb129b1a90414e Mon Sep 17 00:00:00 2001 From: iindyk Date: Wed, 29 Jul 2020 22:34:53 -0400 Subject: [PATCH 2/9] Extending ApproximateQuantiles functionality to deal with non-uniform weights. --- sdks/python/apache_beam/transforms/stats.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 0e44d8f724be..88f554b9d095 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -503,7 +503,6 @@ def create( Args: num_quantiles: Number of quantiles to produce. It is the size of the final output list, including the mininum and maximum value items. - weighted: (optional) if set to True, the combiner produces weighted quantiles. The input elements are then expected to be tuples of values with the corresponding weight. From ad5a28a738622aad9049e05e192558c3a4873bd0 Mon Sep 17 00:00:00 2001 From: iindyk Date: Wed, 29 Jul 2020 22:37:17 -0400 Subject: [PATCH 3/9] Extending ApproximateQuantiles functionality to deal with non-uniform weights. --- sdks/python/apache_beam/transforms/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 88f554b9d095..bc1bd358b4b8 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -523,7 +523,7 @@ def create( """ max_num_elements = max_num_elements or cls._MAX_NUM_ELEMENTS if not epsilon: - epsilon = 1e-2 if weighted else 1.0 / num_quantiles + epsilon = 1e-2 if weighted else (1.0 / num_quantiles) b = 2 while (b - 2) * (1 << (b - 2)) < epsilon * max_num_elements: b = b + 1 From 5e7edfd19de07eb41ebde667140ab715ad0655e7 Mon Sep 17 00:00:00 2001 From: iindyk Date: Thu, 30 Jul 2020 09:52:16 -0400 Subject: [PATCH 4/9] Extending ApproximateQuantiles functionality to deal with non-uniform weights. --- sdks/python/apache_beam/transforms/stats.py | 2 +- sdks/python/apache_beam/transforms/stats_test.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index bc1bd358b4b8..e2fc621d2f0a 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -528,7 +528,7 @@ def create( while (b - 2) * (1 << (b - 2)) < epsilon * max_num_elements: b = b + 1 b = b - 1 - k = max(2, math.ceil(max_num_elements / float(1 << (b - 1)))) + k = max(2, int(math.ceil(max_num_elements / float(1 << (b - 1))))) return cls( num_quantiles=num_quantiles, buffer_size=k, diff --git a/sdks/python/apache_beam/transforms/stats_test.py b/sdks/python/apache_beam/transforms/stats_test.py index e4d23ed96634..f4d9499453ee 100644 --- a/sdks/python/apache_beam/transforms/stats_test.py +++ b/sdks/python/apache_beam/transforms/stats_test.py @@ -612,7 +612,8 @@ def test_quantiles_per_key_weighted(self): per_key = pc | 'Weighted Quantiles PerKey' >> \ beam.ApproximateQuantiles.PerKey(2, weighted=True) per_key_reversed = pc | 'Weighted Quantiles PerKey Reversed' >> \ - beam.ApproximateQuantiles.PerKey(2, weighted=True, reverse=True) + beam.ApproximateQuantiles.PerKey( + 2, weighted=True, reverse=True) assert_that( per_key, From 2441e0d5263d4dda4d2043da24d7ba4e5e50cf47 Mon Sep 17 00:00:00 2001 From: iindyk Date: Tue, 11 Aug 2020 20:28:23 -0400 Subject: [PATCH 5/9] Added example to ApproximateQuantiles docstring, made weighted argument the last and modified _interpolate for readability. --- sdks/python/apache_beam/transforms/stats.py | 140 +++++++++--------- .../apache_beam/transforms/stats_test.py | 4 +- 2 files changed, 71 insertions(+), 73 deletions(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index e2fc621d2f0a..642dda874bbf 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -242,17 +242,25 @@ class ApproximateQuantiles(object): """ PTransform for getting the idea of data distribution using approximate N-tile (e.g. quartiles, percentiles etc.) either globally or per-key. + + Examples: + + in: list(range(101)), num_quantiles=5 + out: [0, 25, 50, 75, 100] + + in: [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, weighted=True + out: [0, 2, 5, 7, 100] """ @staticmethod - def _display_data(num_quantiles, weighted, key, reverse): + def _display_data(num_quantiles, key, reverse, weighted): return { 'num_quantiles': DisplayDataItem(num_quantiles, label='Quantile Count'), - 'weighted': DisplayDataItem(str(weighted), label='Is Weighted'), 'key': DisplayDataItem( key.__name__ if hasattr(key, '__name__') else key.__class__.__name__, label='Record Comparer Key'), - 'reverse': DisplayDataItem(str(reverse), label='Is Reversed') + 'reverse': DisplayDataItem(str(reverse), label='Is Reversed'), + 'weighted': DisplayDataItem(str(weighted), label='Is Weighted') } @typehints.with_input_types( @@ -265,34 +273,34 @@ class Globally(PTransform): Args: num_quantiles: number of elements in the resulting quantiles values list. - weighted: (optional) if set to True, the transform returns weighted - quantiles. The input PCollection is then expected to contain tuples of - input values with the corresponding weight. key: (optional) Key is a mapping of elements to a comparable key, similar to the key argument of Python's sorting methods. reverse: (optional) whether to order things smallest to largest, rather - than largest to smallest + than largest to smallest. + weighted: (optional) if set to True, the transform returns weighted + quantiles. The input PCollection is then expected to contain tuples of + input values with the corresponding weight. """ - def __init__(self, num_quantiles, weighted=False, key=None, reverse=False): + def __init__(self, num_quantiles, key=None, reverse=False, weighted=False): self._num_quantiles = num_quantiles - self._weighted = weighted self._key = key self._reverse = reverse + self._weighted = weighted def expand(self, pcoll): return pcoll | CombineGlobally( ApproximateQuantilesCombineFn.create( num_quantiles=self._num_quantiles, - weighted=self._weighted, key=self._key, - reverse=self._reverse)) + reverse=self._reverse, + weighted=self._weighted)) def display_data(self): return ApproximateQuantiles._display_data( num_quantiles=self._num_quantiles, - weighted=self._weighted, key=self._key, - reverse=self._reverse) + reverse=self._reverse, + weighted=self._weighted) @typehints.with_input_types( typehints.Union[typing.Tuple[K, V], @@ -306,34 +314,34 @@ class PerKey(PTransform): Args: num_quantiles: number of elements in the resulting quantiles values list. - weighted: (optional) if set to True, the transform returns weighted - quantiles. The input PCollection is then expected to contain tuples of - input values with the corresponding weight. key: (optional) Key is a mapping of elements to a comparable key, similar to the key argument of Python's sorting methods. reverse: (optional) whether to order things smallest to largest, rather - than largest to smallest + than largest to smallest. + weighted: (optional) if set to True, the transform returns weighted + quantiles. The input PCollection is then expected to contain tuples of + input values with the corresponding weight. """ - def __init__(self, num_quantiles, weighted=False, key=None, reverse=False): + def __init__(self, num_quantiles, key=None, reverse=False, weighted=False): self._num_quantiles = num_quantiles - self._weighted = weighted self._key = key self._reverse = reverse + self._weighted = weighted def expand(self, pcoll): return pcoll | CombinePerKey( ApproximateQuantilesCombineFn.create( num_quantiles=self._num_quantiles, - weighted=self._weighted, key=self._key, - reverse=self._reverse)) + reverse=self._reverse, + weighted=self._weighted)) def display_data(self): return ApproximateQuantiles._display_data( num_quantiles=self._num_quantiles, - weighted=self._weighted, key=self._key, - reverse=self._reverse) + reverse=self._reverse, + weighted=self._weighted) class _QuantileBuffer(object): @@ -349,10 +357,10 @@ def __init__(self, elements, weighted, level=0, weight=1): def __lt__(self, other): if self.weighted: - [element[0] for element in self.elements - ] < [element[0] for element in other.elements] + return [element[0] for element in self.elements + ] < [element[0] for element in other.elements] else: - self.elements < other.elements + return self.elements < other.elements def sized_iterator(self): class QuantileBufferIterator(object): @@ -424,8 +432,9 @@ class ApproximateQuantilesCombineFn(CombineFn): http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1 &type=pdf - The default error bound is (1 / N) for uniformly distributed data and 1e-2 for - weighted case, though in practice the accuracy tends to be much better. + The default error bound is (1 / N) for uniformly distributed data and + min(1e-2, 1 / N) for weighted case, though in practice the accuracy tends to + be much better. Args: num_quantiles: Number of quantiles to produce. It is the size of the final @@ -434,13 +443,13 @@ class ApproximateQuantilesCombineFn(CombineFn): paper. num_buffers: The number of buffers, corresponding to b in the referenced paper. - weighted: (optional) if set to True, the combiner produces weighted - quantiles. The input elements are then expected to be tuples of input - values with the corresponding weight. key: (optional) Key is a mapping of elements to a comparable key, similar to the key argument of Python's sorting methods. reverse: (optional) whether to order things smallest to largest, rather - than largest to smallest + than largest to smallest. + weighted: (optional) if set to True, the combiner produces weighted + quantiles. The input elements are then expected to be tuples of input + values with the corresponding weight. """ # For alternating between biasing up and down in the above even weight @@ -461,9 +470,9 @@ def __init__( num_quantiles, buffer_size, num_buffers, - weighted=False, key=None, - reverse=False): + reverse=False, + weighted=False): def _comparator(a, b): if key: a, b = key(a), key(b) @@ -480,32 +489,29 @@ def _comparator(a, b): self._num_quantiles = num_quantiles self._buffer_size = buffer_size self._num_buffers = num_buffers - self._weighted = weighted if weighted: self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0])) else: self._key = key self._reverse = reverse + self._weighted = weighted @classmethod def create( cls, num_quantiles, - weighted=False, epsilon=None, max_num_elements=None, key=None, - reverse=False): + reverse=False, + weighted=False): """ Creates an approximate quantiles combiner with the given key and desired number of quantiles. Args: num_quantiles: Number of quantiles to produce. It is the size of the - final output list, including the mininum and maximum value items. - weighted: (optional) if set to True, the combiner produces weighted - quantiles. The input elements are then expected to be tuples of values - with the corresponding weight. + final output list, including the mininum and maximum value items. epsilon: (optional) The default error bound is `epsilon`, which holds as long as the number of elements is less than `_MAX_NUM_ELEMENTS`. Specifically, if one considers the input as a sorted list x_1, ..., @@ -519,11 +525,15 @@ def create( key: (optional) Key is a mapping of elements to a comparable key, similar to the key argument of Python's sorting methods. reverse: (optional) whether to order things smallest to largest, rather - than largest to smallest + than largest to smallest. + weighted: (optional) if set to True, the combiner produces weighted + quantiles. The input elements are then expected to be tuples of values + with the corresponding weight. """ max_num_elements = max_num_elements or cls._MAX_NUM_ELEMENTS if not epsilon: - epsilon = 1e-2 if weighted else (1.0 / num_quantiles) + epsilon = min(1e-2, 1.0 / num_quantiles) \ + if weighted else (1.0 / num_quantiles) b = 2 while (b - 2) * (1 << (b - 2)) < epsilon * max_num_elements: b = b + 1 @@ -533,9 +543,9 @@ def create( num_quantiles=num_quantiles, buffer_size=k, num_buffers=b, - weighted=weighted, key=key, - reverse=reverse) + reverse=reverse, + weighted=weighted) def _add_unbuffered(self, qs, elements): """ @@ -633,34 +643,22 @@ def _interpolate(self, i_buffers, count, step, offset): weighted_element = next(sorted_elem) current = weighted_element[1] j = 0 - if self._weighted: - previous = 0 - new_weights = [] - while j < count: - target = j * step + offset - j = j + 1 - try: - while current <= target: - weighted_element = next(sorted_elem) - current = current + weighted_element[1] - except StopIteration: - pass - new_elements.append(weighted_element[0]) - new_weights.append(current - previous) + previous = 0 + while j < count: + target = j * step + offset + j = j + 1 + try: + while current <= target: + weighted_element = next(sorted_elem) + current = current + weighted_element[1] + except StopIteration: + pass + if self._weighted: + new_elements.append((weighted_element[0], current - previous)) previous = current - return list(zip(new_elements, new_weights)) - else: - while j < count: - target = j * step + offset - j = j + 1 - try: - while current <= target: - weighted_element = next(sorted_elem) - current = current + weighted_element[1] - except StopIteration: - pass + else: new_elements.append(weighted_element[0]) - return new_elements + return new_elements def create_accumulator(self): self._qs = _QuantileState( diff --git a/sdks/python/apache_beam/transforms/stats_test.py b/sdks/python/apache_beam/transforms/stats_test.py index f4d9499453ee..57870a5c1c08 100644 --- a/sdks/python/apache_beam/transforms/stats_test.py +++ b/sdks/python/apache_beam/transforms/stats_test.py @@ -574,7 +574,7 @@ def weight(x): beam.ApproximateQuantiles.Globally(5, weighted=True) reversed_weighted_quantiles = ( pc | 'Quantiles globally weighted reversed' >> - beam.ApproximateQuantiles.Globally(5, weighted=True, reverse=True)) + beam.ApproximateQuantiles.Globally(5, reverse=True, weighted=True)) assert_that( weighted_quantiles, @@ -613,7 +613,7 @@ def test_quantiles_per_key_weighted(self): beam.ApproximateQuantiles.PerKey(2, weighted=True) per_key_reversed = pc | 'Weighted Quantiles PerKey Reversed' >> \ beam.ApproximateQuantiles.PerKey( - 2, weighted=True, reverse=True) + 2, reverse=True, weighted=True) assert_that( per_key, From 170d0cb5a403b02129f9276dd26d9b8671d187a5 Mon Sep 17 00:00:00 2001 From: iindyk Date: Tue, 11 Aug 2020 20:51:28 -0400 Subject: [PATCH 6/9] Added example to ApproximateQuantiles docstring, made weighted argument the last and modified _interpolate for readability. --- sdks/python/apache_beam/transforms/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 642dda874bbf..9a736985978d 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -248,7 +248,8 @@ class ApproximateQuantiles(object): in: list(range(101)), num_quantiles=5 out: [0, 25, 50, 75, 100] - in: [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, weighted=True + in: [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, + weighted=True out: [0, 2, 5, 7, 100] """ @staticmethod From ff2db21c821137abcd7a4a961cc25e8644e33022 Mon Sep 17 00:00:00 2001 From: iindyk Date: Tue, 11 Aug 2020 22:20:46 -0400 Subject: [PATCH 7/9] Added example to ApproximateQuantiles docstring, made weighted argument the last and modified _interpolate for readability. --- sdks/python/apache_beam/transforms/stats.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 9a736985978d..6f744f3b17d0 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -243,14 +243,14 @@ class ApproximateQuantiles(object): PTransform for getting the idea of data distribution using approximate N-tile (e.g. quartiles, percentiles etc.) either globally or per-key. - Examples: + Examples - in: list(range(101)), num_quantiles=5 - out: [0, 25, 50, 75, 100] + in - list(range(101)), num_quantiles=5 + out - [0, 25, 50, 75, 100] - in: [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, + in - [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, weighted=True - out: [0, 2, 5, 7, 100] + out - [0, 2, 5, 7, 100] """ @staticmethod def _display_data(num_quantiles, key, reverse, weighted): From b4f2a9b34ecd888ab8abd35cc8b65de190c561c6 Mon Sep 17 00:00:00 2001 From: iindyk Date: Wed, 12 Aug 2020 11:54:02 -0400 Subject: [PATCH 8/9] Added example to ApproximateQuantiles docstring, made weighted argument the last and modified _interpolate for readability. --- sdks/python/apache_beam/transforms/stats.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 6f744f3b17d0..d5d16acd9140 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -239,18 +239,18 @@ def display_data(self): class ApproximateQuantiles(object): - """ + r""" PTransform for getting the idea of data distribution using approximate N-tile (e.g. quartiles, percentiles etc.) either globally or per-key. - Examples + Examples: - in - list(range(101)), num_quantiles=5 - out - [0, 25, 50, 75, 100] + in: list(range(101)), num_quantiles=5 + out: [0, 25, 50, 75, 100] - in - [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, + in: [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, weighted=True - out - [0, 2, 5, 7, 100] + out: [0, 2, 5, 7, 100] """ @staticmethod def _display_data(num_quantiles, key, reverse, weighted): From f826a0752fdd23aa42de870aae64cb415412d231 Mon Sep 17 00:00:00 2001 From: iindyk Date: Wed, 12 Aug 2020 17:16:52 -0400 Subject: [PATCH 9/9] Added example to ApproximateQuantiles docstring, made weighted argument the last and modified _interpolate for readability. --- sdks/python/apache_beam/transforms/stats.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index d5d16acd9140..ec86cb05c65d 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -239,17 +239,19 @@ def display_data(self): class ApproximateQuantiles(object): - r""" + """ PTransform for getting the idea of data distribution using approximate N-tile (e.g. quartiles, percentiles etc.) either globally or per-key. Examples: in: list(range(101)), num_quantiles=5 + out: [0, 25, 50, 75, 100] in: [(i, 1 if i<10 else 1e-5) for i in range(101)], num_quantiles=5, weighted=True + out: [0, 2, 5, 7, 100] """ @staticmethod