diff --git a/opencensus/metrics/export/metric_producer.py b/opencensus/metrics/export/metric_producer.py new file mode 100644 index 000000000..c8edb4c10 --- /dev/null +++ b/opencensus/metrics/export/metric_producer.py @@ -0,0 +1,81 @@ +# Copyright 2019, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + + +class MetricProducer(object): + """Produces a set of metrics for export.""" + + def get_metrics(self): + """Get a set of metrics to be exported. + + :rtype: set(:class: `opencensus.metrics.export.metric.Metric`) + :return: A set of metrics to be exported. + """ + raise NotImplementedError # pragma: NO COVER + + +class MetricProducerManager(object): + """Container class for MetricProducers to be used by exporters. + + :type metric_producers: iterable(class: 'MetricProducer') + :param metric_producers: Optional initial metric producers. + """ + + def __init__(self, metric_producers=None): + if metric_producers is None: + self.metric_producers = set() + else: + self.metric_producers = set(metric_producers) + self.mp_lock = threading.Lock() + + def add(self, metric_producer): + """Add a metric producer. + + :type metric_producer: :class: 'MetricProducer' + :param metric_producer: The metric producer to add. + """ + if metric_producer is None: + raise ValueError + with self.mp_lock: + self.metric_producers.add(metric_producer) + + def remove(self, metric_producer): + """Remove a metric producer. + + :type metric_producer: :class: 'MetricProducer' + :param metric_producer: The metric producer to remove. + """ + if metric_producer is None: + raise ValueError + try: + with self.mp_lock: + self.metric_producers.remove(metric_producer) + except KeyError: + pass + + def get_all(self): + """Get the set of all metric producers. + + Get a copy of `metric_producers`. Prefer this method to using the + attribute directly to avoid other threads adding/removing producers + while you're reading it. + + :rtype: set(:class: `MetricProducer`) + :return: A set of all metric producers at the time of the call. + """ + with self.mp_lock: + mps_copy = set(self.metric_producers) + return mps_copy diff --git a/opencensus/metrics/export/value.py b/opencensus/metrics/export/value.py index 8ba68fd6c..68608bad2 100644 --- a/opencensus/metrics/export/value.py +++ b/opencensus/metrics/export/value.py @@ -249,9 +249,9 @@ def __init__(self, raise ValueError("bucket_options must not be null") if bucket_options.type_ is None: if buckets is not None: - raise ValueError("buckets must be null if the distribution has" - "no histogram (i.e. bucket_options.type is " - "null)") + raise ValueError("buckets must be null if the distribution " + "has no histogram (i.e. bucket_options.type " + "is null)") else: if len(buckets) != len(bucket_options.type_.bounds) + 1: # Note that this includes the implicit 0 and positive-infinity diff --git a/opencensus/stats/aggregation_data.py b/opencensus/stats/aggregation_data.py index c0d2cf7a2..5d435851f 100644 --- a/opencensus/stats/aggregation_data.py +++ b/opencensus/stats/aggregation_data.py @@ -288,7 +288,9 @@ def to_point(self, timestamp): This method creates a :class: `opencensus.metrics.export.point.Point` with a :class: `opencensus.metrics.export.value.ValueDistribution` value, and creates buckets and exemplars for that distribution from the - appropriate classes in the `metrics` package. + appropriate classes in the `metrics` package. If the distribution + doesn't have a histogram (i.e. `bounds` is empty) the converted point's + `buckets` attribute will be null. :type timestamp: :class: `datetime.datetime` :param timestamp: The time to report the point as having been recorded. @@ -297,17 +299,22 @@ def to_point(self, timestamp): :return: a :class: `opencensus.metrics.export.value.ValueDistribution` -valued Point. """ - buckets = [None] * len(self.counts_per_bucket) - for ii, count in enumerate(self.counts_per_bucket): - stat_ex = self.exemplars.get(ii, None) - if stat_ex is not None: - metric_ex = value.Exemplar(stat_ex.value, stat_ex.timestamp, - copy.copy(stat_ex.attachments)) - buckets[ii] = value.Bucket(count, metric_ex) - else: - buckets[ii] = value.Bucket(count) + if self.bounds: + bucket_options = value.BucketOptions(value.Explicit(self.bounds)) + buckets = [None] * len(self.counts_per_bucket) + for ii, count in enumerate(self.counts_per_bucket): + stat_ex = self.exemplars.get(ii) if self.exemplars else None + if stat_ex is not None: + metric_ex = value.Exemplar(stat_ex.value, + stat_ex.timestamp, + copy.copy(stat_ex.attachments)) + buckets[ii] = value.Bucket(count, metric_ex) + else: + buckets[ii] = value.Bucket(count) - bucket_options = value.BucketOptions(value.Explicit(self.bounds)) + else: + bucket_options = value.BucketOptions() + buckets = None return point.Point( value.ValueDistribution( count=self.count_data, diff --git a/opencensus/stats/measure_to_view_map.py b/opencensus/stats/measure_to_view_map.py index c06364179..5ae42c1ad 100644 --- a/opencensus/stats/measure_to_view_map.py +++ b/opencensus/stats/measure_to_view_map.py @@ -16,6 +16,7 @@ import copy import logging +from opencensus.stats import metric_utils from opencensus.stats import view_data as view_data_module @@ -126,3 +127,21 @@ def export(self, view_datas): if len(self.exporters) > 0: for e in self.exporters: e.export(view_datas) + + def get_metrics(self, timestamp): + """Get a Metric for each registered view. + + Convert each registered view's associated `ViewData` into a `Metric` to + be exported. + + :type timestamp: :class: `datetime.datetime` + :param timestamp: The timestamp to use for metric conversions, usually + the current time. + + :rtype: Iterator[:class: `opencensus.metrics.export.metric.Metric`] + """ + for vdl in self._measure_to_view_data_list_map.values(): + for vd in vdl: + metric = metric_utils.view_data_to_metric(vd, timestamp) + if metric is not None: + yield metric diff --git a/opencensus/stats/metric_utils.py b/opencensus/stats/metric_utils.py index 205f97f31..ebd737cbb 100644 --- a/opencensus/stats/metric_utils.py +++ b/opencensus/stats/metric_utils.py @@ -113,6 +113,9 @@ def view_data_to_metric(view_data, timestamp): :rtype: :class: `opencensus.metrics.export.metric.Metric` :return: A converted Metric. """ + if not view_data.tag_value_aggregation_data_map: + return None + md = view_data.view.get_metric_descriptor() # TODO: implement gauges diff --git a/opencensus/stats/stats.py b/opencensus/stats/stats.py index c1509e3e4..a7cc49940 100644 --- a/opencensus/stats/stats.py +++ b/opencensus/stats/stats.py @@ -12,24 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime + +from opencensus.metrics.export.metric_producer import MetricProducer from opencensus.stats.stats_recorder import StatsRecorder from opencensus.stats.view_manager import ViewManager -class Stats(object): +class Stats(MetricProducer): """Stats defines a View Manager and a Stats Recorder in order for the collection of Stats """ + def __init__(self): - self._stats_recorder = StatsRecorder() - self._view_manager = ViewManager() - - @property - def stats_recorder(self): - """the current stats recorder for Stats""" - return self._stats_recorder - - @property - def view_manager(self): - """the current view manager for Stats""" - return self._view_manager + self.stats_recorder = StatsRecorder() + self.view_manager = ViewManager() + + def get_metrics(self): + """Get a Metric for each of the view manager's registered views. + + Convert each registered view's associated `ViewData` into a `Metric` to + be exported, using the current time for metric conversions. + + :rtype: Iterator[:class: `opencensus.metrics.export.metric.Metric`] + """ + return self.view_manager.measure_to_view_map.get_metrics( + datetime.now()) diff --git a/tests/unit/metrics/export/test_metric_producer.py b/tests/unit/metrics/export/test_metric_producer.py new file mode 100644 index 000000000..56a049718 --- /dev/null +++ b/tests/unit/metrics/export/test_metric_producer.py @@ -0,0 +1,61 @@ +# Copyright 2019, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +try: + from mock import Mock +except ImportError: + from unittest.mock import Mock + +import unittest + +from opencensus.metrics.export import metric_producer + + +class TestMetricProducerManager(unittest.TestCase): + def test_init(self): + mpm1 = metric_producer.MetricProducerManager() + self.assertEqual(mpm1.metric_producers, set()) + + mock_mp = Mock() + mpm2 = metric_producer.MetricProducerManager([mock_mp]) + self.assertEqual(mpm2.metric_producers, set([mock_mp])) + + def test_add_remove(self): + mpm = metric_producer.MetricProducerManager() + self.assertEqual(mpm.metric_producers, set()) + + with self.assertRaises(ValueError): + mpm.add(None) + mock_mp = Mock() + mpm.add(mock_mp) + self.assertEqual(mpm.metric_producers, set([mock_mp])) + mpm.add(mock_mp) + self.assertEqual(mpm.metric_producers, set([mock_mp])) + + with self.assertRaises(ValueError): + mpm.remove(None) + another_mock_mp = Mock() + mpm.remove(another_mock_mp) + self.assertEqual(mpm.metric_producers, set([mock_mp])) + mpm.remove(mock_mp) + self.assertEqual(mpm.metric_producers, set()) + + def test_get_all(self): + mp1 = Mock() + mp2 = Mock() + mpm = metric_producer.MetricProducerManager([mp1, mp2]) + got = mpm.get_all() + mpm.remove(mp1) + self.assertIn(mp1, got) + self.assertIn(mp2, got) diff --git a/tests/unit/stats/test_aggregation_data.py b/tests/unit/stats/test_aggregation_data.py index 477a62d06..9ddd21feb 100644 --- a/tests/unit/stats/test_aggregation_data.py +++ b/tests/unit/stats/test_aggregation_data.py @@ -555,3 +555,22 @@ def test_to_point(self): exemplars_equal( ex_99, converted_point.value.buckets[2].exemplar)) + + def test_to_point_no_histogram(self): + timestamp = datetime(1970, 1, 1) + dist_agg_data = aggregation_data_module.DistributionAggregationData( + mean_data=50, + count_data=99, + min_=1, + max_=99, + sum_of_sqd_deviations=80850.0, + ) + converted_point = dist_agg_data.to_point(timestamp) + self.assertTrue(isinstance(converted_point.value, + value.ValueDistribution)) + self.assertEqual(converted_point.value.count, 99) + self.assertEqual(converted_point.value.sum, 4950) + self.assertEqual(converted_point.value.sum_of_squared_deviation, + 80850.0) + self.assertIsNone(converted_point.value.buckets) + self.assertIsNone(converted_point.value.bucket_options._type) diff --git a/tests/unit/stats/test_stats.py b/tests/unit/stats/test_stats.py index 796f8e91a..7eeae0c25 100644 --- a/tests/unit/stats/test_stats.py +++ b/tests/unit/stats/test_stats.py @@ -12,14 +12,63 @@ # See the License for the specific language governing permissions and # limitations under the License. +try: + from mock import Mock +except ImportError: + from unittest.mock import Mock + import unittest +from opencensus.metrics.export import metric_descriptor +from opencensus.metrics.export import value +from opencensus.stats import aggregation +from opencensus.stats import measure from opencensus.stats import stats as stats_module +from opencensus.stats import view +from opencensus.tags import tag_map class TestStats(unittest.TestCase): - def test_constructor(self): + def test_get_metrics(self): + """Test that Stats converts recorded values into metrics.""" + stats = stats_module.Stats() - self.assertEqual(stats._view_manager, stats.view_manager) - self.assertEqual(stats._stats_recorder, stats.stats_recorder) + # Check that metrics are empty before view registration + initial_metrics = list(stats.get_metrics()) + self.assertEqual(initial_metrics, []) + + mock_measure = Mock(spec=measure.MeasureFloat) + + mock_md = Mock(spec=metric_descriptor.MetricDescriptor) + mock_md.type =\ + metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION + + mock_view = Mock(spec=view.View) + mock_view.measure = mock_measure + mock_view.get_metric_descriptor.return_value = mock_md + mock_view.columns = ['k1'] + + stats.view_manager.measure_to_view_map.register_view(mock_view, Mock()) + + # Check that metrics are stil empty until we record + empty_metrics = list(stats.get_metrics()) + self.assertEqual(empty_metrics, []) + + mm = stats.stats_recorder.new_measurement_map() + mm._measurement_map = {mock_measure: 1.0} + + mock_view.aggregation = aggregation.DistributionAggregation() + + tm = tag_map.TagMap() + tm.insert('k1', 'v1') + mm.record(tm) + + metrics = list(stats.get_metrics()) + self.assertEqual(len(metrics), 1) + [metric] = metrics + self.assertEqual(len(metric.time_series), 1) + [ts] = metric.time_series + self.assertEqual(len(ts.points), 1) + [point] = ts.points + self.assertTrue(isinstance(point.value, value.ValueDistribution))