From 60e4d167a92a05e26a82a500ea992cba0ab24340 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 21 Apr 2022 20:37:06 -0600 Subject: [PATCH 01/10] Implement MetricReader temporality controls Fixes #2627 Fixes #2636 --- .../sdk/_metrics/_view_instrument_match.py | 10 +- .../sdk/_metrics/export/__init__.py | 12 +- .../opentelemetry/sdk/_metrics/instrument.py | 12 +- .../sdk/_metrics/measurement_consumer.py | 14 +- .../sdk/_metrics/metric_reader.py | 70 ++++++- .../sdk/_metrics/metric_reader_storage.py | 27 ++- .../sdk/environment_variables.py | 15 ++ .../tests/metrics/test_metric_reader.py | 175 ++++++++++++++++++ .../tests/metrics/test_metrics.py | 4 +- .../test_periodic_exporting_metric_reader.py | 6 +- .../metrics/test_view_instrument_match.py | 8 +- 11 files changed, 312 insertions(+), 41 deletions(-) create mode 100644 opentelemetry-sdk/tests/metrics/test_metric_reader.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index af1d9eb3513..7d3f83d95af 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -75,7 +75,7 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) - def collect(self, temporality: int) -> Iterable[Metric]: + def collect(self, instrument_class_temporality: int) -> Iterable[Metric]: with self._lock: for ( @@ -106,13 +106,17 @@ def collect(self, temporality: int) -> Iterable[Metric]: self._view._description or self._instrument.description ), - instrumentation_scope=self._instrument.instrumentation_scope, + instrumentation_scope=( + self._instrument.instrumentation_scope + ), name=self._view._name or self._instrument.name, resource=self._sdk_config.resource, unit=self._instrument.unit, point=_convert_aggregation_temporality( previous_point, current_point, - temporality, + instrument_class_temporality[ + self._instrument.__class__ + ], ), ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 88171f975f2..a76b6489590 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -19,7 +19,7 @@ from os import environ, linesep from sys import stdout from threading import Event, RLock, Thread -from typing import IO, Callable, Iterable, List, Optional, Sequence +from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -50,10 +50,6 @@ class MetricExporter(ABC): in their own format. """ - @property - def preferred_temporality(self) -> AggregationTemporality: - return AggregationTemporality.CUMULATIVE - @abstractmethod def export(self, metrics: Sequence[Metric]) -> "MetricExportResult": """Exports a batch of telemetry data. @@ -107,8 +103,7 @@ class InMemoryMetricReader(MetricReader): """ def __init__( - self, - preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + self, preferred_temporality: Dict[type, AggregationTemporality] = None ) -> None: super().__init__(preferred_temporality=preferred_temporality) self._lock = RLock() @@ -139,10 +134,11 @@ class PeriodicExportingMetricReader(MetricReader): def __init__( self, exporter: MetricExporter, + preferred_temporality: Dict[type, AggregationTemporality] = None, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, ) -> None: - super().__init__(preferred_temporality=exporter.preferred_temporality) + super().__init__(preferred_temporality=preferred_temporality) self._exporter = exporter if export_interval_millis is None: try: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py index b637bba9f08..9b74c064ad4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -15,7 +15,7 @@ # pylint: disable=too-many-ancestors import logging -from typing import Dict, Generator, Iterable, Optional, Union +from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union from opentelemetry._metrics.instrument import CallbackT from opentelemetry._metrics.instrument import Counter as APICounter @@ -31,9 +31,13 @@ ) from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter from opentelemetry.sdk._metrics.measurement import Measurement -from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer from opentelemetry.sdk.util.instrumentation import InstrumentationScope +if TYPE_CHECKING: + from opentelemetry.sdk._metrics.measurement_consumer import ( + MeasurementConsumer, + ) + _logger = logging.getLogger(__name__) @@ -42,7 +46,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", unit: str = "", description: str = "", ): @@ -59,7 +63,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", callbacks: Optional[Iterable[CallbackT]] = None, unit: str = "", description: str = "", diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index c4b67702760..7ee0c4ea855 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -14,7 +14,7 @@ from abc import ABC, abstractmethod from threading import Lock -from typing import TYPE_CHECKING, Iterable, List, Mapping +from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping from opentelemetry.sdk._metrics.aggregation import AggregationTemporality from opentelemetry.sdk._metrics.measurement import Measurement @@ -40,7 +40,9 @@ def register_asynchronous_instrument(self, instrument: "_Asynchronous"): @abstractmethod def collect( - self, metric_reader: MetricReader, temporality: AggregationTemporality + self, + metric_reader: MetricReader, + instrument_type_temporality: Dict[type, AggregationTemporality], ) -> Iterable[Metric]: pass @@ -67,11 +69,15 @@ def register_asynchronous_instrument( self._async_instruments.append(instrument) def collect( - self, metric_reader: MetricReader, temporality: AggregationTemporality + self, + metric_reader: MetricReader, + instrument_type_temporality: Dict[type, AggregationTemporality], ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] for async_instrument in self._async_instruments: for measurement in async_instrument.callback(): metric_reader_storage.consume_measurement(measurement) - return self._reader_storages[metric_reader].collect(temporality) + return self._reader_storages[metric_reader].collect( + instrument_type_temporality + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index e14877d87da..c101f6b6fde 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -12,31 +12,85 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from abc import ABC, abstractmethod -from typing import Callable, Iterable +from logging import getLogger +from os import environ +from typing import Callable, Dict, Iterable from typing_extensions import final +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk.environment_variables import ( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, +) -_logger = logging.getLogger(__name__) +_logger = getLogger(__name__) class MetricReader(ABC): """ + Base class for all metric readers + + Args: + preferred_temporality: A mapping between instrument classes and + aggregation temporality. By default uses CUMULATIVE for all instrument + classes. This mapping will be used to define the default aggregation + temporality of every instrument class. If the user wants to make a + change in the default aggregation temporality of an instrument class, + it is enough to pass here a dictionary whose keys are the instrument + classes and the values are the corresponding desired aggregation + temporalities of the classes that the user wants to change, not all of + them. The classes not included in the passed dictionary will retain + their association to their default aggregation temporalities. + .. document protected _receive_metrics which is a intended to be overriden by subclass .. automethod:: _receive_metrics """ def __init__( - self, - preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + self, preferred_temporality: Dict[type, AggregationTemporality] = None ) -> None: self._collect: Callable[ ["MetricReader", AggregationTemporality], Iterable[Metric] ] = None - self._preferred_temporality = preferred_temporality + + if ( + environ.get( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, + "CUMULATIVE", + ) + .upper() + .strip() + == "DELTA" + ): + self._instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + else: + self._instrument_class_temporality = { + Counter: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.CUMULATIVE, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + self._instrument_class_temporality.update(preferred_temporality or {}) @final def collect(self) -> None: @@ -48,7 +102,9 @@ def collect(self) -> None: "Cannot call collect on a MetricReader until it is registered on a MeterProvider" ) return - self._receive_metrics(self._collect(self, self._preferred_temporality)) + self._receive_metrics( + self._collect(self, self._instrument_class_temporality) + ) @final def _set_collect_callback( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py index 45c4d4dd793..7835bf5858d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -83,21 +83,30 @@ def consume_measurement(self, measurement: Measurement) -> None: ): view_instrument_match.consume_measurement(measurement) - def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]: - # use a list instead of yielding to prevent a slow reader from holding SDK locks + def collect( + self, instrument_type_temporality: Dict[type, AggregationTemporality] + ) -> Iterable[Metric]: + # Use a list instead of yielding to prevent a slow reader from holding + # SDK locks metrics: List[Metric] = [] - # While holding the lock, new _ViewInstrumentMatch can't be added from another thread (so we are - # sure we collect all existing view). However, instruments can still send measurements - # that will make it into the individual aggregations; collection will acquire those - # locks iteratively to keep locking as fine-grained as possible. One side effect is - # that end times can be slightly skewed among the metric streams produced by the SDK, - # but we still align the output timestamps for a single instrument. + # While holding the lock, new _ViewInstrumentMatch can't be added from + # another thread (so we are sure we collect all existing view). + # However, instruments can still send measurements that will make it + # into the individual aggregations; collection will acquire those locks + # iteratively to keep locking as fine-grained as possible. One side + # effect is that end times can be slightly skewed among the metric + # streams produced by the SDK, but we still align the output timestamps + # for a single instrument. with self._lock: for ( view_instrument_matches ) in self._view_instrument_match.values(): for view_instrument_match in view_instrument_matches: - metrics.extend(view_instrument_match.collect(temporality)) + metrics.extend( + view_instrument_match.collect( + instrument_type_temporality + ) + ) return metrics diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index d9bab1bb169..3e632bc521f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -406,3 +406,18 @@ provide the entry point for loading the log emitter provider. If not specified, SDK LogEmitterProvider is used. """ + +_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = ( + "OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE" +) +""" +.. envvar:: OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE + +The :envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment +variable allows users to set the default aggregation temporality policy to use +on the basis of instrument kind. The valid (case-insensitive) values are: + +``CUMULATIVE``: Choose ``CUMULATIVE`` aggregation temporality for all instrument kinds. +``DELTA``: Choose ``DELTA`` aggregation temporality for ``Counter``, ``Asynchronous Counter`` and ``Histogram``. +Choose ``CUMULATIVE`` aggregation temporality for ``UpDownCounter`` and ``Asynchronous UpDownCounter``. +""" diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py new file mode 100644 index 00000000000..0f9792313ae --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -0,0 +1,175 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from os import environ +from typing import Dict +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.sdk._metrics.aggregation import AggregationTemporality +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk.environment_variables import ( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, +) + + +class DummyMetricReader(MetricReader): + def __init__( + self, preferred_temporality: Dict[type, AggregationTemporality] = None + ) -> None: + super().__init__( + preferred_temporality=preferred_temporality, + ) + + def _receive_metrics(self, metrics): + pass + + def shutdown(self): + return True + + +class TestMetricReader(TestCase): + @patch.dict( + environ, + {_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "CUMULATIVE"}, + ) + def test_configure_temporality_cumulative(self): + + dummy_metric_reader = DummyMetricReader() + + self.assertEqual( + dummy_metric_reader._instrument_class_temporality.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + for ( + value + ) in dummy_metric_reader._instrument_class_temporality.values(): + self.assertEqual(value, AggregationTemporality.CUMULATIVE) + + @patch.dict( + environ, {_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "DELTA"} + ) + def test_configure_temporality_delta(self): + + dummy_metric_reader = DummyMetricReader() + + self.assertEqual( + dummy_metric_reader._instrument_class_temporality.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Counter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableCounter + ], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ObservableGauge], + AggregationTemporality.CUMULATIVE, + ) + + def test_configure_temporality_parameter(self): + + dummy_metric_reader = DummyMetricReader( + preferred_temporality={ + Histogram: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.DELTA, + } + ) + + self.assertEqual( + dummy_metric_reader._instrument_class_temporality.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Counter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ObservableGauge], + AggregationTemporality.DELTA, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 0279dfbe618..81d1068fb80 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -36,7 +36,7 @@ UpDownCounter, ) from opentelemetry.sdk._metrics.metric_reader import MetricReader -from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.point import Metric from opentelemetry.sdk._metrics.view import View from opentelemetry.sdk.resources import Resource from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc @@ -44,7 +44,7 @@ class DummyMetricReader(MetricReader): def __init__(self): - super().__init__(AggregationTemporality.CUMULATIVE) + super().__init__() def _receive_metrics(self, metrics): pass diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 728e2911800..ff67e848afe 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -84,7 +84,9 @@ def _create_periodic_reader( self, metrics, exporter, collect_wait=0, interval=60000 ): - pmr = PeriodicExportingMetricReader(exporter, interval) + pmr = PeriodicExportingMetricReader( + exporter, export_interval_millis=interval + ) def _collect(reader, temp): time.sleep(collect_wait) @@ -95,7 +97,7 @@ def _collect(reader, temp): def test_ticker_called(self): collect_mock = Mock() - pmr = PeriodicExportingMetricReader(Mock(), 1) + pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1) pmr._set_collect_callback(collect_mock) time.sleep(0.1) self.assertTrue(collect_mock.assert_called_once) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 20b1a041698..9ccdd90933c 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -13,7 +13,7 @@ # limitations under the License. from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import MagicMock, Mock from opentelemetry.sdk._metrics._view_instrument_match import ( _ViewInstrumentMatch, @@ -185,7 +185,11 @@ def test_collect(self): self.assertEqual( next( view_instrument_match.collect( - AggregationTemporality.CUMULATIVE + MagicMock( + **{ + "__getitem__.return_value": AggregationTemporality.CUMULATIVE + } + ) ) ), Metric( From dd50a77d06140c17dd72668e5cf1b3cf1bddecb9 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 10:05:08 -0600 Subject: [PATCH 02/10] Fix type --- .../src/opentelemetry/sdk/_metrics/_view_instrument_match.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index 7d3f83d95af..70664cb99d7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -75,7 +75,10 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) - def collect(self, instrument_class_temporality: int) -> Iterable[Metric]: + def collect( + self, + instrument_class_temporality: Dict[type, AggregationTemporality] + ) -> Iterable[Metric]: with self._lock: for ( From 4ef690bdcb3e8b4141bc0b96b3db4261d7102a7b Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 10:09:34 -0600 Subject: [PATCH 03/10] Add temporality check --- .../src/opentelemetry/sdk/_metrics/metric_reader.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index c101f6b6fde..b3a4299b0a2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -90,6 +90,15 @@ def __init__( ObservableGauge: AggregationTemporality.CUMULATIVE, } + for temporality in preferred_temporality.values(): + if ( + temporality != AggregationTemporality.CUMULATIVE + or temporality != AggregationTemporality.DELTA + ): + raise Exception( + f"Invalid temporality value found {temporality}" + ) + self._instrument_class_temporality.update(preferred_temporality or {}) @final From 0e8c7f3bc2ffeffbd47bc0b10e6e5382acb46797 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 10:14:00 -0600 Subject: [PATCH 04/10] Fix check --- .../opentelemetry/sdk/_metrics/metric_reader.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index b3a4299b0a2..b273dcea1e7 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -90,14 +90,15 @@ def __init__( ObservableGauge: AggregationTemporality.CUMULATIVE, } - for temporality in preferred_temporality.values(): - if ( - temporality != AggregationTemporality.CUMULATIVE - or temporality != AggregationTemporality.DELTA - ): - raise Exception( - f"Invalid temporality value found {temporality}" - ) + if preferred_temporality is not None: + for temporality in preferred_temporality.values(): + if ( + temporality != AggregationTemporality.CUMULATIVE + or temporality != AggregationTemporality.DELTA + ): + raise Exception( + f"Invalid temporality value found {temporality}" + ) self._instrument_class_temporality.update(preferred_temporality or {}) From 8df1a165f37e17332e56cc29d326b30c6633f1e1 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 10:21:46 -0600 Subject: [PATCH 05/10] Fix check --- .../src/opentelemetry/sdk/_metrics/metric_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index b273dcea1e7..e97257ed861 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -94,7 +94,7 @@ def __init__( for temporality in preferred_temporality.values(): if ( temporality != AggregationTemporality.CUMULATIVE - or temporality != AggregationTemporality.DELTA + and temporality != AggregationTemporality.DELTA ): raise Exception( f"Invalid temporality value found {temporality}" From c405790bce7b75e7c1de97ccd5c5fe60ce2640b6 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 10:24:02 -0600 Subject: [PATCH 06/10] Fix lint --- .../src/opentelemetry/sdk/_metrics/_view_instrument_match.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index 70664cb99d7..a75ce7d4c81 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -76,8 +76,7 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) def collect( - self, - instrument_class_temporality: Dict[type, AggregationTemporality] + self, instrument_class_temporality: Dict[type, AggregationTemporality] ) -> Iterable[Metric]: with self._lock: From 8ac72de9c650b518546396a0e622af6063f6a232 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 10:33:53 -0600 Subject: [PATCH 07/10] Fix check --- .../src/opentelemetry/sdk/_metrics/metric_reader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index e97257ed861..8ecd4784450 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -92,9 +92,9 @@ def __init__( if preferred_temporality is not None: for temporality in preferred_temporality.values(): - if ( - temporality != AggregationTemporality.CUMULATIVE - and temporality != AggregationTemporality.DELTA + if temporality not in ( + AggregationTemporality.CUMULATIVE, + AggregationTemporality.DELTA, ): raise Exception( f"Invalid temporality value found {temporality}" From d67b3a250b1938a9850bf7862e2bb56e38fc2497 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 15:34:26 -0600 Subject: [PATCH 08/10] Fix docs --- .../src/opentelemetry/sdk/_metrics/metric_reader.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index 8ecd4784450..85ef1cd3510 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -50,11 +50,16 @@ class MetricReader(ABC): temporalities of the classes that the user wants to change, not all of them. The classes not included in the passed dictionary will retain their association to their default aggregation temporalities. + The value passed here will override the corresponding values set + via the environment variable .. document protected _receive_metrics which is a intended to be overriden by subclass .. automethod:: _receive_metrics """ + # FIXME add :std:envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` + # to the end of the documentation paragraph above. + def __init__( self, preferred_temporality: Dict[type, AggregationTemporality] = None ) -> None: From f8a99243a4861eb4d36d05d1a0560a73946a083c Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 25 Apr 2022 16:35:24 -0600 Subject: [PATCH 09/10] Add changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c070efc9ab8..8c28937d340 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21 - +- Implement MetricReader temporality controls + ([#2637](https://github.com/open-telemetry/opentelemetry-python/pull/2637)) - Fix unhandled callback exceptions on async instruments ([#2614](https://github.com/open-telemetry/opentelemetry-python/pull/2614)) - Rename `DefaultCounter`, `DefaultHistogram`, `DefaultObservableCounter`, From d90477fa7025c534d7a2bd887fa013018cd85265 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Tue, 26 Apr 2022 12:31:34 -0600 Subject: [PATCH 10/10] Update changelog message --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c28937d340..644cfcdefa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21 -- Implement MetricReader temporality controls +- Add parameter to MetricReader constructor to select temporality per instrument kind ([#2637](https://github.com/open-telemetry/opentelemetry-python/pull/2637)) - Fix unhandled callback exceptions on async instruments ([#2614](https://github.com/open-telemetry/opentelemetry-python/pull/2614))