From eab78cd272c01632013be77313c5ac9e4686e2ec Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 21 Apr 2022 20:48:46 -0600 Subject: [PATCH 1/3] Implement metric reader default aggregation controls Fixes #2635 --- .../sdk/_metrics/_view_instrument_match.py | 22 +++++-- .../sdk/_metrics/export/__init__.py | 16 ++++- .../opentelemetry/sdk/_metrics/instrument.py | 5 +- .../sdk/_metrics/measurement_consumer.py | 4 +- .../sdk/_metrics/metric_reader.py | 35 ++++++++++- .../sdk/_metrics/metric_reader_storage.py | 18 +++++- .../tests/metrics/test_metric_reader.py | 56 ++++++++++++++++- .../metrics/test_metric_reader_storage.py | 22 ++++--- .../metrics/test_view_instrument_match.py | 60 +++++++++++++++++++ 9 files changed, 214 insertions(+), 24 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index a75ce7d4c81..3754f117a3f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -18,7 +18,9 @@ from typing import TYPE_CHECKING, Dict, Iterable from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, _Aggregation, + _AggregationFactory, _convert_aggregation_temporality, _PointVarT, ) @@ -39,6 +41,7 @@ def __init__( view: View, instrument: "_Instrument", sdk_config: SdkConfiguration, + instrument_class_aggregation: Dict[type, _AggregationFactory], ): self._view = view self._instrument = instrument @@ -46,6 +49,7 @@ def __init__( self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} self._attributes_previous_point: Dict[frozenset, _PointVarT] = {} self._lock = Lock() + self._instrument_class_aggregation = instrument_class_aggregation # pylint: disable=protected-access def consume_measurement(self, measurement: Measurement) -> None: @@ -67,11 +71,19 @@ def consume_measurement(self, measurement: Measurement) -> None: if attributes not in self._attributes_aggregation: with self._lock: if attributes not in self._attributes_aggregation: - self._attributes_aggregation[ - attributes - ] = self._view._aggregation._create_aggregation( - self._instrument - ) + if not isinstance( + self._view._aggregation, DefaultAggregation + ): + aggregation = ( + self._view._aggregation._create_aggregation( + self._instrument + ) + ) + else: + aggregation = self._instrument_class_aggregation[ + self._instrument.__class__ + ]._create_aggregation(self._instrument) + self._attributes_aggregation[attributes] = aggregation self._attributes_aggregation[attributes].aggregate(measurement) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index a76b6489590..62c04472522 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -27,6 +27,7 @@ detach, set_value, ) +from opentelemetry.sdk._metrics.aggregation import _AggregationFactory from opentelemetry.sdk._metrics.metric_reader import MetricReader from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.util._once import Once @@ -103,9 +104,14 @@ class InMemoryMetricReader(MetricReader): """ def __init__( - self, preferred_temporality: Dict[type, AggregationTemporality] = None + self, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, _AggregationFactory] = None, ) -> None: - super().__init__(preferred_temporality=preferred_temporality) + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) self._lock = RLock() self._metrics: List[Metric] = [] @@ -135,10 +141,14 @@ def __init__( self, exporter: MetricExporter, preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, _AggregationFactory] = None, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, ) -> None: - super().__init__(preferred_temporality=preferred_temporality) + super().__init__( + preferred_temporality=exporter.preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) self._exporter = exporter if export_interval_millis is None: try: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py index 9b74c064ad4..92de8775506 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -14,7 +14,7 @@ # pylint: disable=too-many-ancestors -import logging +from logging import getLogger from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union from opentelemetry._metrics.instrument import CallbackT @@ -38,7 +38,8 @@ MeasurementConsumer, ) -_logger = logging.getLogger(__name__) + +_logger = getLogger(__name__) class _Synchronous: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index 7ee0c4ea855..df77bf71e4d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -53,7 +53,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None: self._sdk_config = sdk_config # should never be mutated self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = { - reader: MetricReaderStorage(sdk_config) + reader: MetricReaderStorage( + sdk_config, reader._instrument_class_aggregation + ) for reader in sdk_config.metric_readers } self._async_instruments: List["_Asynchronous"] = [] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index 85ef1cd3510..95553ba4d98 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -19,6 +19,10 @@ from typing_extensions import final +from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, + _AggregationFactory, +) from opentelemetry.sdk._metrics.instrument import ( Counter, Histogram, @@ -40,6 +44,7 @@ class MetricReader(ABC): Base class for all metric readers Args: +<<<<<<< HEAD preferred_temporality: A mapping between instrument classes and aggregation temporality. By default uses CUMULATIVE for all instrument classes. This mapping will be used to define the default aggregation @@ -52,6 +57,21 @@ class MetricReader(ABC): their association to their default aggregation temporalities. The value passed here will override the corresponding values set via the environment variable +======= + preferred_aggregation: A mapping between instrument classes and + aggregation instances. By default maps all instrument classes to an + instance of `DefaultAggregation`. This mapping will be used to + define the default aggregation of every instrument class. If the + user wants to make a change in the default aggregation of an + instrument class, it is enough to pass here a dictionary whose keys + are the instrument classes and the values are the corresponding + desired aggregation for the instrument classes that the user wants + to change, not necessarily all of them. The classes not included in + the passed dictionary will retain their association to their + default aggregations. The aggregation defined here will be + overriden by an aggregation defined by a view that is not + `DefaultAggregation`. +>>>>>>> 4b0317eb9 (Implement metric reader default aggregation controls) .. document protected _receive_metrics which is a intended to be overriden by subclass .. automethod:: _receive_metrics @@ -61,7 +81,9 @@ class MetricReader(ABC): # to the end of the documentation paragraph above. def __init__( - self, preferred_temporality: Dict[type, AggregationTemporality] = None + self, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, _AggregationFactory] = None, ) -> None: self._collect: Callable[ ["MetricReader", AggregationTemporality], Iterable[Metric] @@ -106,6 +128,17 @@ def __init__( ) self._instrument_class_temporality.update(preferred_temporality or {}) + self._preferred_temporality = preferred_temporality + self._instrument_class_aggregation = { + Counter: DefaultAggregation(), + UpDownCounter: DefaultAggregation(), + Histogram: DefaultAggregation(), + ObservableCounter: DefaultAggregation(), + ObservableUpDownCounter: DefaultAggregation(), + ObservableGauge: DefaultAggregation(), + } + + self._instrument_class_aggregation.update(preferred_aggregation or {}) @final def collect(self) -> None: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py index 7835bf5858d..d76457f41b8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -19,7 +19,10 @@ from opentelemetry.sdk._metrics._view_instrument_match import ( _ViewInstrumentMatch, ) -from opentelemetry.sdk._metrics.aggregation import AggregationTemporality +from opentelemetry.sdk._metrics.aggregation import ( + AggregationTemporality, + _AggregationFactory, +) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import Metric from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration @@ -31,12 +34,17 @@ class MetricReaderStorage: """The SDK's storage for a given reader""" - def __init__(self, sdk_config: SdkConfiguration) -> None: + def __init__( + self, + sdk_config: SdkConfiguration, + instrument_class_aggregation: Dict[type, _AggregationFactory], + ) -> None: self._lock = RLock() self._sdk_config = sdk_config self._view_instrument_match: Dict[ Instrument, List[_ViewInstrumentMatch] ] = {} + self._instrument_class_aggregation = instrument_class_aggregation def _get_or_init_view_instrument_match( self, instrument: Instrument @@ -62,6 +70,9 @@ def _get_or_init_view_instrument_match( view=view, instrument=instrument, sdk_config=self._sdk_config, + instrument_class_aggregation=( + self._instrument_class_aggregation + ), ) ) @@ -72,6 +83,9 @@ def _get_or_init_view_instrument_match( view=_DEFAULT_VIEW, instrument=instrument, sdk_config=self._sdk_config, + instrument_class_aggregation=( + self._instrument_class_aggregation + ), ) ) self._view_instrument_match[instrument] = view_instrument_matches diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py index 0f9792313ae..e96beeebc27 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -13,11 +13,17 @@ # limitations under the License. from os import environ +from unittest.mock import patch + from typing import Dict from unittest import TestCase -from unittest.mock import patch -from opentelemetry.sdk._metrics.aggregation import AggregationTemporality +from opentelemetry.sdk._metrics.aggregation import ( + AggregationTemporality, + DefaultAggregation, + LastValueAggregation, + _AggregationFactory, +) from opentelemetry.sdk._metrics.instrument import ( Counter, Histogram, @@ -34,10 +40,13 @@ class DummyMetricReader(MetricReader): def __init__( - self, preferred_temporality: Dict[type, AggregationTemporality] = None + self, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, _AggregationFactory] = None, ) -> None: super().__init__( preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, ) def _receive_metrics(self, metrics): @@ -173,3 +182,44 @@ def test_configure_temporality_parameter(self): dummy_metric_reader._instrument_class_temporality[ObservableGauge], AggregationTemporality.DELTA, ) + + def test_default_temporality(self): + dummy_metric_reader = DummyMetricReader() + self.assertEqual( + dummy_metric_reader._instrument_class_aggregation.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + for ( + value + ) in dummy_metric_reader._instrument_class_aggregation.values(): + self.assertIsInstance(value, DefaultAggregation) + + dummy_metric_reader = DummyMetricReader( + preferred_aggregation={Counter: LastValueAggregation()} + ) + self.assertEqual( + dummy_metric_reader._instrument_class_aggregation.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + self.assertIsInstance( + dummy_metric_reader._instrument_class_aggregation[Counter], + LastValueAggregation, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index f7ffc6993a2..d0d05854d8a 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch -from opentelemetry.sdk._metrics.aggregation import DropAggregation +from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, + DropAggregation, +) from opentelemetry.sdk._metrics.instrument import Counter from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.metric_reader_storage import ( @@ -56,7 +59,8 @@ def test_creates_view_instrument_matches( resource=Mock(), metric_readers=(), views=(view1, view2), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) # instrument1 matches view1 and view2, so should create two ViewInstrumentMatch objects @@ -100,7 +104,8 @@ def test_forwards_calls_to_view_instrument_match( resource=Mock(), metric_readers=(), views=(view1, view2), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) # Measurements from an instrument should be passed on to each ViewInstrumentMatch objects @@ -147,7 +152,8 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock): resource=Mock(), metric_readers=(), views=(view1,), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) def send_measurement(): @@ -172,7 +178,8 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock): resource=Mock(), metric_readers=(), views=(), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) storage.consume_measurement(Measurement(1, instrument1)) @@ -200,7 +207,8 @@ def test_drop_aggregation(self): instrument_name="name", aggregation=DropAggregation() ), ), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) metric_reader_storage.consume_measurement(Measurement(1, counter)) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 9ccdd90933c..9d91c3675d8 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -19,9 +19,13 @@ _ViewInstrumentMatch, ) from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, DropAggregation, + LastValueAggregation, _DropAggregation, + _LastValueAggregation, ) +from opentelemetry.sdk._metrics.instrument import Counter from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration @@ -56,6 +60,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( @@ -95,6 +102,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( @@ -123,6 +133,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( Measurement(value=0, instrument=instrument1, attributes=None) @@ -145,6 +158,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( Measurement(value=0, instrument=instrument1, attributes=None) @@ -173,6 +189,9 @@ def test_collect(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( @@ -202,3 +221,44 @@ def test_collect(self): point=None, ), ) + + def test_setting_aggregation(self): + instrument1 = Counter( + name="instrument1", + instrumentation_scope=Mock(), + measurement_consumer=Mock(), + description="description", + unit="unit", + ) + instrument1.instrumentation_scope = self.mock_instrumentation_scope + sdk_config = SdkConfiguration( + resource=self.mock_resource, + metric_readers=[], + views=[], + ) + view_instrument_match = _ViewInstrumentMatch( + view=View( + instrument_name="instrument1", + name="name", + aggregation=DefaultAggregation(), + attribute_keys={"a", "c"}, + ), + instrument=instrument1, + sdk_config=sdk_config, + instrument_class_aggregation={Counter: LastValueAggregation()}, + ) + + view_instrument_match.consume_measurement( + Measurement( + value=0, + instrument=Mock(name="instrument1"), + attributes={"c": "d", "f": "g"}, + ) + ) + + self.assertIsInstance( + view_instrument_match._attributes_aggregation[ + frozenset({("c", "d")}) + ], + _LastValueAggregation, + ) From 407e74db180b760fa5f7ef01cd8266eb7f9fcf1b Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 16:32:19 -0600 Subject: [PATCH 2/3] Add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 644cfcdefa4..36e49f5140a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21 +- Add parameter to MetricReader constructor to select aggregation per instrument kind + ([#2638](https://github.com/open-telemetry/opentelemetry-python/pull/2638)) - Add parameter to MetricReader constructor to select temporality per instrument kind ([#2637](https://github.com/open-telemetry/opentelemetry-python/pull/2637)) - Fix unhandled callback exceptions on async instruments From f375ece73583d5c63ea8d08001355e9e8c53f4ac Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Wed, 27 Apr 2022 12:43:09 -0600 Subject: [PATCH 3/3] Add some fixes --- .../src/opentelemetry/sdk/_metrics/export/__init__.py | 2 +- .../src/opentelemetry/sdk/_metrics/metric_reader.py | 3 --- opentelemetry-sdk/tests/metrics/test_metric_reader.py | 3 +-- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 62c04472522..00db5c1a915 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -146,7 +146,7 @@ def __init__( export_timeout_millis: Optional[float] = None, ) -> None: super().__init__( - preferred_temporality=exporter.preferred_temporality, + preferred_temporality=preferred_temporality, preferred_aggregation=preferred_aggregation, ) self._exporter = exporter diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index 95553ba4d98..7bcdce465d4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -44,7 +44,6 @@ class MetricReader(ABC): Base class for all metric readers Args: -<<<<<<< HEAD preferred_temporality: A mapping between instrument classes and aggregation temporality. By default uses CUMULATIVE for all instrument classes. This mapping will be used to define the default aggregation @@ -57,7 +56,6 @@ class MetricReader(ABC): their association to their default aggregation temporalities. The value passed here will override the corresponding values set via the environment variable -======= preferred_aggregation: A mapping between instrument classes and aggregation instances. By default maps all instrument classes to an instance of `DefaultAggregation`. This mapping will be used to @@ -71,7 +69,6 @@ class MetricReader(ABC): default aggregations. The aggregation defined here will be overriden by an aggregation defined by a view that is not `DefaultAggregation`. ->>>>>>> 4b0317eb9 (Implement metric reader default aggregation controls) .. document protected _receive_metrics which is a intended to be overriden by subclass .. automethod:: _receive_metrics diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py index e96beeebc27..200b8824b86 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -13,10 +13,9 @@ # limitations under the License. from os import environ -from unittest.mock import patch - from typing import Dict from unittest import TestCase +from unittest.mock import patch from opentelemetry.sdk._metrics.aggregation import ( AggregationTemporality,