diff --git a/opencensus/stats/aggregation.py b/opencensus/stats/aggregation.py index f2bdc10ec..4a771a045 100644 --- a/opencensus/stats/aggregation.py +++ b/opencensus/stats/aggregation.py @@ -12,10 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from opencensus.stats import bucket_boundaries from opencensus.stats import aggregation_data +logger = logging.getLogger(__name__) + + class Type(object): """ The type of aggregation function used on a View. @@ -123,11 +128,26 @@ class DistributionAggregation(BaseAggregation): :param aggregation_type: represents the type of this aggregation """ - def __init__( - self, - boundaries=None, - distribution=None, - aggregation_type=Type.DISTRIBUTION): + + def __init__(self, + boundaries=None, + distribution=None, + aggregation_type=Type.DISTRIBUTION): + if boundaries: + if not all(boundaries[ii] < boundaries[ii + 1] + for ii in range(len(boundaries) - 1)): + raise ValueError("bounds must be sorted in increasing order") + for ii, bb in enumerate(boundaries): + if bb > 0: + break + else: + ii += 1 + if ii: + logger.warning("Dropping {} negative bucket boundaries, the " + "values must be strictly > 0" + .format(ii)) + boundaries = boundaries[ii:] + super(DistributionAggregation, self).__init__( buckets=boundaries, aggregation_type=aggregation_type) self._boundaries = bucket_boundaries.BucketBoundaries(boundaries) diff --git a/opencensus/stats/aggregation_data.py b/opencensus/stats/aggregation_data.py index 5e79ee9fe..78f867b93 100644 --- a/opencensus/stats/aggregation_data.py +++ b/opencensus/stats/aggregation_data.py @@ -12,9 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from opencensus.stats import bucket_boundaries +logger = logging.getLogger(__name__) + + class BaseAggregationData(object): """Aggregation Data represents an aggregated value from a collection @@ -126,12 +131,17 @@ def __init__(self, self._sum_of_sqd_deviations = sum_of_sqd_deviations if bounds is None: bounds = [] + else: + assert bounds == list(sorted(set(bounds))) if counts_per_bucket is None: counts_per_bucket = [0 for ii in range(len(bounds) + 1)] - elif len(counts_per_bucket) != len(bounds) + 1: - raise ValueError("counts_per_bucket length does not match bounds " - "length") + else: + assert all(cc >= 0 for cc in counts_per_bucket) + assert len(counts_per_bucket) == len(bounds) + 1 + + assert bounds == sorted(bounds) + assert all(bb > 0 for bb in bounds) self._counts_per_bucket = counts_per_bucket self._bounds = bucket_boundaries.BucketBoundaries( diff --git a/opencensus/stats/exporters/prometheus_exporter.py b/opencensus/stats/exporters/prometheus_exporter.py index e23a0d4d6..715b8904e 100644 --- a/opencensus/stats/exporters/prometheus_exporter.py +++ b/opencensus/stats/exporters/prometheus_exporter.py @@ -164,31 +164,13 @@ def to_metric(self, desc, view): labels=labels) elif isinstance(agg_data, aggregation_data_module.DistributionAggregationData): + + assert(agg_data.bounds == sorted(agg_data.bounds)) points = {} - # Histograms are cumulative in Prometheus. - # 1. Sort buckets in ascending order but, retain - # their indices for reverse lookup later on. - # TODO: If there is a guarantee that distribution elements - # are always sorted, then skip the sorting. - indices_map = {} - buckets = [] - i = 0 - for boundarie in view.aggregation.boundaries.boundaries: - if boundarie not in indices_map \ - or indices_map == {}: # pragma: NO COVER - indices_map[str(boundarie)] = i - buckets.append(str(boundarie)) - i += 1 - - buckets.sort() - - # 2. Now that the buckets are sorted by magnitude - # we can create cumulative indicesmap them back by reverse index cum_count = 0 - for bucket in buckets: - i = indices_map[bucket] - cum_count += int(agg_data.counts_per_bucket[i]) - points[bucket] = cum_count + for ii, bound in enumerate(agg_data.bounds): + cum_count += agg_data.counts_per_bucket[ii] + points[str(bound)] = cum_count labels = desc['labels'] if points is None else None return HistogramMetricFamily(name=desc['name'], documentation=desc['documentation'], @@ -217,7 +199,7 @@ def to_metric(self, desc, view): % type(agg_data)) def collect(self): # pragma: NO COVER - """ Collect fetches the statistics from OpenCensus + """Collect fetches the statistics from OpenCensus and delivers them as Prometheus Metrics. Collect is invoked everytime a prometheus.Gatherer is run for example when the HTTP endpoint is invoked by Prometheus. diff --git a/opencensus/stats/measure_to_view_map.py b/opencensus/stats/measure_to_view_map.py index de366fce7..83b13a531 100644 --- a/opencensus/stats/measure_to_view_map.py +++ b/opencensus/stats/measure_to_view_map.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from opencensus.stats.view_data import ViewData from collections import defaultdict -import logging import copy +import logging + +from opencensus.stats import view_data as view_data_module class MeasureToViewMap(object): @@ -91,10 +92,12 @@ def register_view(self, view, timestamp): if registered_measure is None: self._registered_measures[measure.name] = measure self._measure_to_view_data_list_map[view.measure.name].append( - ViewData(view=view, start_time=timestamp, end_time=timestamp)) + view_data_module.ViewData(view=view, start_time=timestamp, + end_time=timestamp)) def record(self, tags, measurement_map, timestamp, attachments=None): """records stats with a set of tags""" + assert all(vv >= 0 for vv in measurement_map.values()) for measure, value in measurement_map.items(): if measure != self._registered_measures.get(measure.name): return diff --git a/opencensus/stats/measurement_map.py b/opencensus/stats/measurement_map.py index 956f25a99..2c1dab23a 100644 --- a/opencensus/stats/measurement_map.py +++ b/opencensus/stats/measurement_map.py @@ -13,9 +13,14 @@ # limitations under the License. from datetime import datetime +import logging + from opencensus.tags import execution_context +logger = logging.getLogger(__name__) + + class MeasurementMap(object): """Measurement Map is a map from Measures to measured values to be recorded at the same time @@ -33,6 +38,10 @@ def __init__(self, measure_to_view_map, attachments=None): self._measurement_map = {} self._measure_to_view_map = measure_to_view_map self._attachments = attachments + # If the user tries to record a negative value for any measurement, + # refuse to record all measurements from this map. Recording negative + # measurements will become an error in a later release. + self._invalid = False @property def measurement_map(self): @@ -51,10 +60,16 @@ def attachments(self): def measure_int_put(self, measure, value): """associates the measure of type Int with the given value""" + if value < 0: + # Should be an error in a later release. + logger.warning("Cannot record negative values") self._measurement_map[measure] = value def measure_float_put(self, measure, value): """associates the measure of type Float with the given value""" + if value < 0: + # Should be an error in a later release. + logger.warning("Cannot record negative values") self._measurement_map[measure] = value def measure_put_attachment(self, key, value): @@ -75,11 +90,27 @@ def measure_put_attachment(self, key, value): self._attachments[key] = value - def record(self, tag_map_tags=execution_context.get_current_tag_map()): + def record(self, tag_map_tags=None): """records all the measures at the same time with a tag_map. tag_map could either be explicitly passed to the method, or implicitly read from current execution context. """ + if tag_map_tags is None: + tag_map_tags = execution_context.get_current_tag_map() + if self._invalid: + logger.warning("Measurement map has included negative value " + "measurements, refusing to record") + return + for measure, value in self.measurement_map.items(): + if value < 0: + self._invalid = True + logger.warning("Dropping values, value to record must be " + "non-negative") + logger.info("Measure '{}' has negative value ({}), refusing " + "to record measurements from {}" + .format(measure.name, value, self)) + return + self.measure_to_view_map.record( tags=tag_map_tags, measurement_map=self.measurement_map, diff --git a/tests/unit/stats/exporter/test_prometheus_stats.py b/tests/unit/stats/exporter/test_prometheus_stats.py index 5d3407aa8..5237ecf50 100644 --- a/tests/unit/stats/exporter/test_prometheus_stats.py +++ b/tests/unit/stats/exporter/test_prometheus_stats.py @@ -42,7 +42,7 @@ VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2" VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation( - [0.0, 16.0 * MiB, 256.0 * MiB]) + [16.0 * MiB, 256.0 * MiB]) VIDEO_SIZE_VIEW = view_module.View( VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY], VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION) @@ -189,7 +189,7 @@ def test_collector_to_metric_histogram(self): self.assertEqual(desc['name'], metric.name) self.assertEqual(desc['documentation'], metric.documentation) self.assertEqual('histogram', metric.type) - self.assertEqual(5, len(metric.samples)) + self.assertEqual(4, len(metric.samples)) def test_collector_to_metric_invalid_dist(self): agg = mock.Mock() @@ -232,7 +232,7 @@ def test_collector_collect(self): self.assertEqual(desc['name'], metric.name) self.assertEqual(desc['documentation'], metric.documentation) self.assertEqual('histogram', metric.type) - self.assertEqual(5, len(metric.samples)) + self.assertEqual(4, len(metric.samples)) class TestPrometheusStatsExporter(unittest.TestCase): diff --git a/tests/unit/stats/exporter/test_stackdriver_stats.py b/tests/unit/stats/exporter/test_stackdriver_stats.py index 6476eb5da..fb8d4738b 100644 --- a/tests/unit/stats/exporter/test_stackdriver_stats.py +++ b/tests/unit/stats/exporter/test_stackdriver_stats.py @@ -43,7 +43,7 @@ VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2" VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation( - [0.0, 16.0 * MiB, 256.0 * MiB]) + [16.0 * MiB, 256.0 * MiB]) VIDEO_SIZE_VIEW = view_module.View( VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY], VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION) diff --git a/tests/unit/stats/test_aggregation.py b/tests/unit/stats/test_aggregation.py index 71b2d546d..d384a7669 100644 --- a/tests/unit/stats/test_aggregation.py +++ b/tests/unit/stats/test_aggregation.py @@ -100,14 +100,14 @@ def test_constructor_defaults(self): distribution_aggregation.aggregation_type) def test_constructor_explicit(self): - boundaries = ["test"] - distribution = {1: "test"} + boundaries = [1, 2] + distribution = [0, 1, 2] distribution_aggregation = aggregation_module.DistributionAggregation( boundaries=boundaries, distribution=distribution) - self.assertEqual(["test"], + self.assertEqual([1, 2], distribution_aggregation.boundaries.boundaries) - self.assertEqual({1: "test"}, distribution_aggregation.distribution) + self.assertEqual([0, 1, 2], distribution_aggregation.distribution) self.assertEqual(aggregation_module.Type.DISTRIBUTION, distribution_aggregation.aggregation_type) @@ -122,3 +122,20 @@ def test_min_max(self): self.assertEqual(da.aggregation_data.min, -10) self.assertEqual(da.aggregation_data.max, 10) + + def test_init_bad_boundaries(self): + """Check that boundaries must be sorted and unique.""" + with self.assertRaises(ValueError): + aggregation_module.DistributionAggregation([1, 3, 2]) + with self.assertRaises(ValueError): + aggregation_module.DistributionAggregation([1, 1, 2]) + + def test_init_negative_boundaries(self): + """Check that non-positive boundaries are dropped.""" + da = aggregation_module.DistributionAggregation([-2, -1, 0, 1, 2]) + self.assertEqual(da.boundaries.boundaries, [1, 2]) + self.assertEqual(da.aggregation_data.bounds, [1, 2]) + + da2 = aggregation_module.DistributionAggregation([-2, -1]) + self.assertEqual(da2.boundaries.boundaries, []) + self.assertEqual(da2.aggregation_data.bounds, []) diff --git a/tests/unit/stats/test_aggregation_data.py b/tests/unit/stats/test_aggregation_data.py index 30766e4c2..4c86e5240 100644 --- a/tests/unit/stats/test_aggregation_data.py +++ b/tests/unit/stats/test_aggregation_data.py @@ -88,8 +88,8 @@ def test_constructor(self): _min = 0 _max = 1 sum_of_sqd_deviations = mock.Mock() - counts_per_bucket = [1, 1, 1, 1] - bounds = [0, 1.0 / 2.0, 1] + counts_per_bucket = [1, 1, 1] + bounds = [1.0 / 2.0, 1] dist_agg_data = aggregation_data_module.DistributionAggregationData( mean_data=mean_data, @@ -106,15 +106,15 @@ def test_constructor(self): self.assertEqual(1, dist_agg_data.max) self.assertEqual(sum_of_sqd_deviations, dist_agg_data.sum_of_sqd_deviations) - self.assertEqual([1, 1, 1, 1], dist_agg_data.counts_per_bucket) - self.assertEqual([0, 1.0 / 2.0, 1], dist_agg_data.bounds) + self.assertEqual([1, 1, 1], dist_agg_data.counts_per_bucket) + self.assertEqual([1.0 / 2.0, 1], dist_agg_data.bounds) self.assertIsNotNone(dist_agg_data.sum) self.assertEqual(0, dist_agg_data.variance) def test_init_bad_bucket_counts(self): # Check that len(counts_per_bucket) == len(bounds) + 1 - with self.assertRaises(ValueError): + with self.assertRaises(AssertionError): aggregation_data_module.DistributionAggregationData( mean_data=mock.Mock(), count_data=mock.Mock(), @@ -122,7 +122,18 @@ def test_init_bad_bucket_counts(self): max_=mock.Mock(), sum_of_sqd_deviations=mock.Mock(), counts_per_bucket=[0, 0, 0], - bounds=[0, 1, 2]) + bounds=[1, 2, 3]) + + # Check that counts aren't negative + with self.assertRaises(AssertionError): + aggregation_data_module.DistributionAggregationData( + mean_data=mock.Mock(), + count_data=mock.Mock(), + min_=mock.Mock(), + max_=mock.Mock(), + sum_of_sqd_deviations=mock.Mock(), + counts_per_bucket=[0, 2, -2, 0], + bounds=[1, 2, 3]) # And check that we don't throw given the right args aggregation_data_module.DistributionAggregationData( @@ -132,7 +143,41 @@ def test_init_bad_bucket_counts(self): max_=mock.Mock(), sum_of_sqd_deviations=mock.Mock(), counts_per_bucket=[0, 0, 0, 0], - bounds=[0, 1, 2]) + bounds=[1, 2, 3]) + + def test_init_bad_bounds(self): + # Check that bounds are unique + with self.assertRaises(AssertionError): + aggregation_data_module.DistributionAggregationData( + mean_data=mock.Mock(), + count_data=mock.Mock(), + min_=mock.Mock(), + max_=mock.Mock(), + sum_of_sqd_deviations=mock.Mock(), + counts_per_bucket=[0, 0, 0, 0], + bounds=[1, 2, 2]) + + # Check that bounds are sorted + with self.assertRaises(AssertionError): + aggregation_data_module.DistributionAggregationData( + mean_data=mock.Mock(), + count_data=mock.Mock(), + min_=mock.Mock(), + max_=mock.Mock(), + sum_of_sqd_deviations=mock.Mock(), + counts_per_bucket=[0, 0, 0, 0], + bounds=[1, 3, 2]) + + # Check that all bounds are positive + with self.assertRaises(AssertionError): + aggregation_data_module.DistributionAggregationData( + mean_data=mock.Mock(), + count_data=mock.Mock(), + min_=mock.Mock(), + max_=mock.Mock(), + sum_of_sqd_deviations=mock.Mock(), + counts_per_bucket=[0, 0, 0, 0], + bounds=[-1, 1, 2]) def test_constructor_with_exemplar(self): timestamp = time.time() @@ -146,8 +191,8 @@ def test_constructor_with_exemplar(self): _min = 0 _max = 1 sum_of_sqd_deviations = mock.Mock() - counts_per_bucket = [1, 1, 1, 1] - bounds = [0, 1.0 / 2.0, 1] + counts_per_bucket = [1, 1, 1] + bounds = [1.0 / 2.0, 1] exemplars = [exemplar_1, exemplar_2] dist_agg_data = aggregation_data_module.DistributionAggregationData( @@ -166,9 +211,9 @@ def test_constructor_with_exemplar(self): self.assertEqual(1, dist_agg_data.max) self.assertEqual(sum_of_sqd_deviations, dist_agg_data.sum_of_sqd_deviations) - self.assertEqual([1, 1, 1, 1], dist_agg_data.counts_per_bucket) - self.assertEqual([exemplar_1, exemplar_2], dist_agg_data.exemplars[3]) - self.assertEqual([0, 1.0 / 2.0, 1], dist_agg_data.bounds) + self.assertEqual([1, 1, 1], dist_agg_data.counts_per_bucket) + self.assertEqual([exemplar_1, exemplar_2], dist_agg_data.exemplars[2]) + self.assertEqual([1.0 / 2.0, 1], dist_agg_data.bounds) self.assertIsNotNone(dist_agg_data.sum) self.assertEqual(0, dist_agg_data.variance) @@ -231,8 +276,8 @@ def test_variance(self): _min = mock.Mock() _max = mock.Mock() sum_of_sqd_deviations = mock.Mock() - counts_per_bucket = [1, 1, 1, 1] - bounds = [0, 1.0 / 2.0, 1] + counts_per_bucket = [1, 1, 1] + bounds = [1.0 / 2.0, 1] dist_agg_data = aggregation_data_module.DistributionAggregationData( mean_data=mean_data, count_data=count_data, @@ -261,8 +306,8 @@ def test_add_sample(self): _min = 0 _max = 1 sum_of_sqd_deviations = 2 - counts_per_bucket = [1, 1, 1, 1, 1] - bounds = [0, 0.5, 1, 1.5] + counts_per_bucket = [1, 1, 1, 1] + bounds = [0.5, 1, 1.5] value = 3 @@ -307,8 +352,8 @@ def test_add_sample_attachment(self): _min = 0 _max = 1 sum_of_sqd_deviations = 2 - counts_per_bucket = [1, 1, 1, 1, 1] - bounds = [0, 0.5, 1, 1.5] + counts_per_bucket = [1, 1, 1, 1] + bounds = [0.5, 1, 1.5] value = 3 timestamp = time.time() @@ -326,14 +371,14 @@ def test_add_sample_attachment(self): bounds=bounds, exemplars=exemplar_1) - self.assertEqual({4: exemplar_1}, dist_agg_data.exemplars) + self.assertEqual({3: exemplar_1}, dist_agg_data.exemplars) dist_agg_data.add_sample(value, timestamp, attachments) self.assertEqual(0, dist_agg_data.min) self.assertEqual(3, dist_agg_data.max) self.assertEqual(2, dist_agg_data.count_data) self.assertEqual(2.0, dist_agg_data.mean_data) - self.assertEqual(3, dist_agg_data.exemplars[4].value) + self.assertEqual(3, dist_agg_data.exemplars[3].value) count_data = 4 dist_agg_data = aggregation_data_module.DistributionAggregationData( diff --git a/tests/unit/stats/test_measure_to_view_map.py b/tests/unit/stats/test_measure_to_view_map.py index 738bf9133..60a8bb0d4 100644 --- a/tests/unit/stats/test_measure_to_view_map.py +++ b/tests/unit/stats/test_measure_to_view_map.py @@ -303,6 +303,22 @@ def test_record(self): attachments=None) self.assertIsNone(record) + def test_record_negative_value(self): + """Check that we warn and drop negative measures at record time.""" + measure = mock.Mock() + view_data = mock.Mock() + measure_to_view_map = measure_to_view_map_module.MeasureToViewMap() + measure_to_view_map._registered_measures = {measure.name: measure} + measure_to_view_map._measure_to_view_data_list_map = { + measure.name: [view_data] + } + with self.assertRaises(AssertionError): + measure_to_view_map.record( + tags=mock.Mock(), + measurement_map={measure: -1}, + timestamp=mock.Mock()) + view_data.record.assert_not_called() + def test_record_with_exporter(self): exporter = mock.Mock() measure_name = "test_measure" diff --git a/tests/unit/stats/test_measurement_map.py b/tests/unit/stats/test_measurement_map.py index 0aa23c405..23741f893 100644 --- a/tests/unit/stats/test_measurement_map.py +++ b/tests/unit/stats/test_measurement_map.py @@ -12,15 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest import mock +import unittest + from opencensus.stats import measurement_map as measurement_map_module -from opencensus.stats.measure_to_view_map import MeasureToViewMap from opencensus.tags import execution_context -from opencensus.stats.measure import BaseMeasure -from opencensus.stats.measure import MeasureInt -from opencensus.stats import measure_to_view_map as measure_to_view_map_module -from opencensus.stats.view import View + + +logger_patch = mock.patch('opencensus.stats.measurement_map.logger') class TestMeasurementMap(unittest.TestCase): @@ -137,3 +136,57 @@ def test_record_against_implicit_tag_map(self): execution_context.set_current_tag_map(tags) measurement_map.record() self.assertTrue(measure_to_view_map.record.called) + + def test_record_negative_value(self): + """Check that we refuse to record negative measurements.""" + measurement_map = measurement_map_module.MeasurementMap(mock.Mock()) + measurement_map.measure_int_put(mock.Mock(), 1) + measurement_map.measure_int_put(mock.Mock(), -1) + + with logger_patch as mock_logger: + measurement_map.record() + + self.assertTrue(measurement_map._invalid) + measurement_map._measure_to_view_map.record.assert_not_called() + mock_logger.warning.assert_called_once() + + def test_record_previous_negative_value(self): + """Check that negative measurements poison the map.""" + measurement_map = measurement_map_module.MeasurementMap(mock.Mock()) + measure = mock.Mock() + measurement_map.measure_int_put(measure, 1) + + measurement_map.record() + self.assertFalse(measurement_map._invalid) + measurement_map._measure_to_view_map.record.assert_called_once() + + measurement_map.measure_int_put(measure, -1) + measurement_map._measure_to_view_map = mock.Mock() + + with logger_patch as mock_logger: + measurement_map.record() + + self.assertTrue(measurement_map._invalid) + measurement_map._measure_to_view_map.record.assert_not_called() + mock_logger.warning.assert_called_once() + + measurement_map.measure_int_put(measure, 1) + + with logger_patch as another_mock_logger: + measurement_map.record() + + self.assertTrue(measurement_map._invalid) + measurement_map._measure_to_view_map.record.assert_not_called() + another_mock_logger.warning.assert_called_once() + + def test_log_negative_puts(self): + """Check that we warn against negative measurements on put.""" + measurement_map = measurement_map_module.MeasurementMap(mock.Mock()) + + with logger_patch as mock_logger: + measurement_map.measure_int_put(mock.Mock(), -1) + mock_logger.warning.assert_called_once() + + with logger_patch as another_mock_logger: + measurement_map.measure_float_put(mock.Mock(), -1.0) + another_mock_logger.warning.assert_called_once()