diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f9a93cca3f..648e72d5f59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.10.0-0.29b0...HEAD) +- Add default aggregation + ([#2543](https://github.com/open-telemetry/opentelemetry-python/pull/2543)) - Fix incorrect installation of some exporter “convenience” packages into “site-packages/src” ([#2525](https://github.com/open-telemetry/opentelemetry-python/pull/2525)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index 385a8516d1a..b124f773183 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -67,15 +67,11 @@ def consume_measurement(self, measurement: Measurement) -> None: if attributes not in self._attributes_aggregation: with self._lock: if attributes not in self._attributes_aggregation: - if self._view._aggregation: - aggregation = ( - self._view._aggregation._create_aggregation( - self._instrument - ) - ) - else: - aggregation = self._instrument._default_aggregation - self._attributes_aggregation[attributes] = aggregation + self._attributes_aggregation[ + attributes + ] = self._view._aggregation._create_aggregation( + self._instrument + ) self._attributes_aggregation[attributes].aggregate(measurement) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py index 9771833cf03..4471e5f3f4f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py @@ -27,18 +27,20 @@ from opentelemetry._metrics.instrument import ( Asynchronous, + Counter, + Histogram, Instrument, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, Synchronous, + UpDownCounter, _Monotonic, ) from opentelemetry.sdk._metrics.measurement import Measurement -from opentelemetry.sdk._metrics.point import ( - AggregationTemporality, - Gauge, - Histogram, - PointT, - Sum, -) +from opentelemetry.sdk._metrics.point import AggregationTemporality, Gauge +from opentelemetry.sdk._metrics.point import Histogram as HistogramPoint +from opentelemetry.sdk._metrics.point import PointT, Sum from opentelemetry.util._time import _time_ns _PointVarT = TypeVar("_PointVarT", bound=PointT) @@ -67,6 +69,66 @@ def collect(self) -> Optional[_PointVarT]: pass +class _AggregationFactory(ABC): + @abstractmethod + def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + """Creates an aggregation""" + + +class DefaultAggregation(_AggregationFactory): + """ + The default aggregation to be used in a `View`. + + This aggregation will create an actual aggregation depending on the + instrument type, as specified next: + + ========================= ==================================== + Instrument Aggregation + ========================= ==================================== + `Counter` `SumAggregation` + `UpDownCounter` `SumAggregation` + `ObservableCounter` `SumAggregation` + `ObservableUpDownCounter` `SumAggregation` + `Histogram` `ExplicitBucketHistogramAggregation` + `ObservableGauge` `LastValueAggregation` + ========================= ==================================== + """ + + def _create_aggregation(self, instrument: Instrument) -> _Aggregation: + + # pylint: disable=too-many-return-statements + if isinstance(instrument, Counter): + return _SumAggregation( + instrument_is_monotonic=True, + instrument_temporality=AggregationTemporality.DELTA, + ) + if isinstance(instrument, UpDownCounter): + return _SumAggregation( + instrument_is_monotonic=False, + instrument_temporality=AggregationTemporality.DELTA, + ) + + if isinstance(instrument, ObservableCounter): + return _SumAggregation( + instrument_is_monotonic=True, + instrument_temporality=AggregationTemporality.CUMULATIVE, + ) + + if isinstance(instrument, ObservableUpDownCounter): + return _SumAggregation( + instrument_is_monotonic=False, + instrument_temporality=AggregationTemporality.CUMULATIVE, + ) + + if isinstance(instrument, Histogram): + return _ExplicitBucketHistogramAggregation() + + if isinstance(instrument, ObservableGauge): + return _LastValueAggregation() + + raise Exception(f"Invalid instrument type {type(instrument)} found") + + class _SumAggregation(_Aggregation[Sum]): def __init__( self, @@ -154,7 +216,7 @@ def collect(self) -> Optional[Gauge]: ) -class _ExplicitBucketHistogramAggregation(_Aggregation[Histogram]): +class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, boundaries: Sequence[float] = ( @@ -195,7 +257,7 @@ def aggregate(self, measurement: Measurement) -> None: self._bucket_counts[bisect_left(self._boundaries, value)] += 1 - def collect(self) -> Histogram: + def collect(self) -> HistogramPoint: """ Atomically return a point for the current value of the metric. """ @@ -210,7 +272,7 @@ def collect(self) -> Histogram: self._start_time_unix_nano = now + 1 self._sum = 0 - return Histogram( + return HistogramPoint( start_time_unix_nano=start_time_unix_nano, time_unix_nano=now, bucket_counts=tuple(value), @@ -295,7 +357,7 @@ def _convert_aggregation_temporality( is_monotonic=is_monotonic, ) - if current_point_type is Histogram: + if current_point_type is HistogramPoint: if previous_point is None: return replace( current_point, aggregation_temporality=aggregation_temporality @@ -329,7 +391,7 @@ def _convert_aggregation_temporality( ) ] - return Histogram( + return HistogramPoint( start_time_unix_nano=start_time_unix_nano, time_unix_nano=current_point.time_unix_nano, bucket_counts=bucket_counts, @@ -340,12 +402,6 @@ def _convert_aggregation_temporality( return None -class _AggregationFactory(ABC): - @abstractmethod - def _create_aggregation(self, instrument: Instrument) -> _Aggregation: - """Creates an aggregation""" - - class ExplicitBucketHistogramAggregation(_AggregationFactory): def __init__( self, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py index 990684b8179..f2ac050d975 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -15,7 +15,6 @@ # pylint: disable=too-many-ancestors import logging -from abc import ABC, abstractmethod from typing import Callable, Dict, Generator, Iterable, Union from opentelemetry._metrics.instrument import CallbackT @@ -32,28 +31,14 @@ ) from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter from opentelemetry._metrics.measurement import Measurement as APIMeasurement -from opentelemetry.sdk._metrics.aggregation import ( - _Aggregation, - _ExplicitBucketHistogramAggregation, - _LastValueAggregation, - _SumAggregation, -) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer -from opentelemetry.sdk._metrics.point import AggregationTemporality from opentelemetry.sdk.util.instrumentation import InstrumentationInfo _logger = logging.getLogger(__name__) -class _Instrument(ABC): - @property - @abstractmethod - def _default_aggregation(self) -> _Aggregation: - pass - - -class _Synchronous(_Instrument): +class _Synchronous: def __init__( self, name: str, @@ -70,7 +55,7 @@ def __init__( super().__init__(name, unit=unit, description=description) -class _Asynchronous(_Instrument): +class _Asynchronous: def __init__( self, name: str, @@ -108,13 +93,6 @@ def callback(self) -> Iterable[Measurement]: class Counter(_Synchronous, APICounter): - @property - def _default_aggregation(self) -> _Aggregation: - return _SumAggregation( - instrument_is_monotonic=True, - instrument_temporality=AggregationTemporality.DELTA, - ) - def add( self, amount: Union[int, float], attributes: Dict[str, str] = None ): @@ -129,13 +107,6 @@ def add( class UpDownCounter(_Synchronous, APIUpDownCounter): - @property - def _default_aggregation(self) -> _Aggregation: - return _SumAggregation( - instrument_is_monotonic=False, - instrument_temporality=AggregationTemporality.DELTA, - ) - def add( self, amount: Union[int, float], attributes: Dict[str, str] = None ): @@ -145,28 +116,14 @@ def add( class ObservableCounter(_Asynchronous, APIObservableCounter): - @property - def _default_aggregation(self) -> _Aggregation: - return _SumAggregation( - instrument_is_monotonic=True, - instrument_temporality=AggregationTemporality.CUMULATIVE, - ) + pass class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter): - @property - def _default_aggregation(self) -> _Aggregation: - return _SumAggregation( - instrument_is_monotonic=False, - instrument_temporality=AggregationTemporality.CUMULATIVE, - ) + pass class Histogram(_Synchronous, APIHistogram): - @property - def _default_aggregation(self) -> _Aggregation: - return _ExplicitBucketHistogramAggregation() - def record( self, amount: Union[int, float], attributes: Dict[str, str] = None ): @@ -182,6 +139,4 @@ def record( class ObservableGauge(_Asynchronous, APIObservableGauge): - @property - def _default_aggregation(self) -> _Aggregation: - return _LastValueAggregation() + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py index faecd77d3a3..d83ce47cf67 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py @@ -21,7 +21,10 @@ from typing_extensions import final from opentelemetry._metrics.instrument import Instrument -from opentelemetry.sdk._metrics.aggregation import _AggregationFactory +from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, + _AggregationFactory, +) _logger = getLogger(__name__) @@ -69,12 +72,15 @@ class View: are in ``attribute_keys`` will be used to identify the metric stream. aggregation: This is a metric stream customizing attribute: the - aggregatation instance to use when data is aggregated for the corresponding metrics - stream. If `None` the default aggregation of the instrument will be used. + aggregation instance to use when data is aggregated for the + corresponding metrics stream. If `None` an instance of + `DefaultAggregation` will be used. This class is not intended to be subclassed by the user. """ + _default_aggregation = DefaultAggregation() + def __init__( self, instrument_type: Optional[Type[Instrument]] = None, @@ -122,7 +128,7 @@ def __init__( self._description = description self._attribute_keys = attribute_keys - self._aggregation = aggregation + self._aggregation = aggregation or self._default_aggregation # pylint: disable=too-many-return-statements # pylint: disable=too-many-branches diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index 414939e30cd..675f5775cf3 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -23,6 +23,7 @@ from opentelemetry.sdk._metrics import instrument from opentelemetry.sdk._metrics.aggregation import ( AggregationTemporality, + DefaultAggregation, ExplicitBucketHistogramAggregation, LastValueAggregation, SumAggregation, @@ -31,8 +32,18 @@ _LastValueAggregation, _SumAggregation, ) +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) from opentelemetry.sdk._metrics.measurement import Measurement -from opentelemetry.sdk._metrics.point import Gauge, Histogram, Sum +from opentelemetry.sdk._metrics.point import Gauge +from opentelemetry.sdk._metrics.point import Histogram as HistogramPoint +from opentelemetry.sdk._metrics.point import Sum from opentelemetry.util.types import Attributes @@ -601,7 +612,7 @@ def test_current_point_gauge(self): class TestHistogramConvertAggregationTemporality(TestCase): def test_previous_point_none(self): - current_point = Histogram( + current_point = HistogramPoint( start_time_unix_nano=0, time_unix_nano=1, bucket_counts=[0, 2, 1, 2, 0], @@ -625,7 +636,7 @@ def test_previous_point_non_cumulative(self): with self.assertRaises(Exception): _convert_aggregation_temporality( - Histogram( + HistogramPoint( start_time_unix_nano=0, time_unix_nano=1, bucket_counts=[0, 2, 1, 2, 0], @@ -633,7 +644,7 @@ def test_previous_point_non_cumulative(self): sum=70, aggregation_temporality=AggregationTemporality.DELTA, ), - Histogram( + HistogramPoint( start_time_unix_nano=1, time_unix_nano=2, bucket_counts=[0, 1, 3, 0, 0], @@ -645,7 +656,7 @@ def test_previous_point_non_cumulative(self): ), def test_same_aggregation_temporality_cumulative(self): - current_point = Histogram( + current_point = HistogramPoint( start_time_unix_nano=0, time_unix_nano=2, bucket_counts=[0, 3, 4, 2, 0], @@ -655,7 +666,7 @@ def test_same_aggregation_temporality_cumulative(self): ) self.assertEqual( _convert_aggregation_temporality( - Histogram( + HistogramPoint( start_time_unix_nano=0, time_unix_nano=1, bucket_counts=[0, 2, 1, 2, 0], @@ -670,7 +681,7 @@ def test_same_aggregation_temporality_cumulative(self): ) def test_same_aggregation_temporality_delta(self): - current_point = Histogram( + current_point = HistogramPoint( start_time_unix_nano=1, time_unix_nano=2, bucket_counts=[0, 1, 3, 0, 0], @@ -681,7 +692,7 @@ def test_same_aggregation_temporality_delta(self): self.assertEqual( _convert_aggregation_temporality( - Histogram( + HistogramPoint( start_time_unix_nano=0, time_unix_nano=2, bucket_counts=[0, 3, 4, 2, 0], @@ -696,7 +707,7 @@ def test_same_aggregation_temporality_delta(self): ) def test_aggregation_temporality_to_cumulative(self): - current_point = Histogram( + current_point = HistogramPoint( start_time_unix_nano=1, time_unix_nano=2, bucket_counts=[0, 1, 3, 0, 0], @@ -707,7 +718,7 @@ def test_aggregation_temporality_to_cumulative(self): self.assertEqual( _convert_aggregation_temporality( - Histogram( + HistogramPoint( start_time_unix_nano=0, time_unix_nano=1, bucket_counts=[0, 2, 1, 2, 0], @@ -718,7 +729,7 @@ def test_aggregation_temporality_to_cumulative(self): current_point, AggregationTemporality.CUMULATIVE, ), - Histogram( + HistogramPoint( start_time_unix_nano=0, time_unix_nano=2, bucket_counts=[0, 3, 4, 2, 0], @@ -729,7 +740,7 @@ def test_aggregation_temporality_to_cumulative(self): ) def test_aggregation_temporality_to_delta(self): - current_point = Histogram( + current_point = HistogramPoint( start_time_unix_nano=0, time_unix_nano=2, bucket_counts=[0, 3, 4, 2, 0], @@ -740,7 +751,7 @@ def test_aggregation_temporality_to_delta(self): self.assertEqual( _convert_aggregation_temporality( - Histogram( + HistogramPoint( start_time_unix_nano=0, time_unix_nano=1, bucket_counts=[0, 2, 1, 2, 0], @@ -751,7 +762,7 @@ def test_aggregation_temporality_to_delta(self): current_point, AggregationTemporality.DELTA, ), - Histogram( + HistogramPoint( start_time_unix_nano=1, time_unix_nano=2, bucket_counts=[0, 1, 3, 0, 0], @@ -817,3 +828,80 @@ def test_last_value_factory(self): self.assertIsInstance(aggregation, _LastValueAggregation) aggregation2 = factory._create_aggregation(counter) self.assertNotEqual(aggregation, aggregation2) + + +class TestDefaultAggregation(TestCase): + @classmethod + def setUpClass(cls): + cls.default_aggregation = DefaultAggregation() + + def test_counter(self): + + aggregation = self.default_aggregation._create_aggregation( + Counter(Mock(), Mock(), Mock()) + ) + self.assertIsInstance(aggregation, _SumAggregation) + self.assertTrue(aggregation._instrument_is_monotonic) + self.assertEqual( + aggregation._instrument_temporality, AggregationTemporality.DELTA + ) + + def test_up_down_counter(self): + + aggregation = self.default_aggregation._create_aggregation( + UpDownCounter(Mock(), Mock(), Mock()) + ) + self.assertIsInstance(aggregation, _SumAggregation) + self.assertFalse(aggregation._instrument_is_monotonic) + self.assertEqual( + aggregation._instrument_temporality, AggregationTemporality.DELTA + ) + + def test_observable_counter(self): + + aggregation = self.default_aggregation._create_aggregation( + ObservableCounter(Mock(), Mock(), Mock(), Mock()) + ) + self.assertIsInstance(aggregation, _SumAggregation) + self.assertTrue(aggregation._instrument_is_monotonic) + self.assertEqual( + aggregation._instrument_temporality, + AggregationTemporality.CUMULATIVE, + ) + + def test_observable_up_down_counter(self): + + aggregation = self.default_aggregation._create_aggregation( + ObservableUpDownCounter(Mock(), Mock(), Mock(), Mock()) + ) + self.assertIsInstance(aggregation, _SumAggregation) + self.assertFalse(aggregation._instrument_is_monotonic) + self.assertEqual( + aggregation._instrument_temporality, + AggregationTemporality.CUMULATIVE, + ) + + def test_histogram(self): + + aggregation = self.default_aggregation._create_aggregation( + Histogram( + Mock(), + Mock(), + Mock(), + Mock(), + Mock(), + ) + ) + self.assertIsInstance(aggregation, _ExplicitBucketHistogramAggregation) + + def test_observable_gauge(self): + + aggregation = self.default_aggregation._create_aggregation( + ObservableGauge( + Mock(), + Mock(), + Mock(), + Mock(), + ) + ) + self.assertIsInstance(aggregation, _LastValueAggregation)