diff --git a/CHANGELOG.md b/CHANGELOG.md index c98bb873ce5..1614019b288 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.11.1-0.30b1...HEAD) +- Add timeouts to metric SDK + ([#2653](https://github.com/open-telemetry/opentelemetry-python/pull/2653)) - Add variadic arguments to metric exporter/reader interfaces ([#2654](https://github.com/open-telemetry/opentelemetry-python/pull/2654)) - Move Metrics API behind internal package diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index c40ff1ddd21..693eb261dba 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -169,9 +169,13 @@ def _translate_data( ) def export( - self, metrics: Sequence[Metric], *args, **kwargs + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> MetricExportResult: + # TODO(#2663): OTLPExporterMixin should pass timeout to gRPC return self._export(metrics) - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 1b81f062c02..448a04bf293 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -111,15 +111,17 @@ def __init__(self, prefix: str = "") -> None: self._collector._callback = self.collect def _receive_metrics( - self, metrics: Iterable[Metric], *args, **kwargs + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> None: if metrics is None: return self._collector.add_metrics_data(metrics) - def shutdown(self, *args, **kwargs) -> bool: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: REGISTRY.unregister(self._collector) - return True class _CustomCollector: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py index 39256b73271..eebcb0b13cf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/__init__.py @@ -48,6 +48,7 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util._once import Once +from opentelemetry.util._time import _time_ns _logger = getLogger(__name__) @@ -369,16 +370,20 @@ def __init__( self._shutdown_once = Once() self._shutdown = False - def force_flush(self) -> bool: - - # FIXME implement a timeout + def force_flush(self, timeout_millis: float = 10_000) -> bool: + deadline_ns = _time_ns() + timeout_millis * 10**6 for metric_reader in self._sdk_config.metric_readers: - metric_reader.collect() + current_ts = _time_ns() + if current_ts >= deadline_ns: + raise Exception("Timed out while flushing metric readers") + metric_reader.collect( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) return True - def shutdown(self): - # FIXME implement a timeout + def shutdown(self, timeout_millis: float = 30_000): + deadline_ns = _time_ns() + timeout_millis * 10**6 def _shutdown(): self._shutdown = True @@ -392,8 +397,15 @@ def _shutdown(): metric_reader_error = {} for metric_reader in self._sdk_config.metric_readers: + current_ts = _time_ns() try: - metric_reader.shutdown() + if current_ts >= deadline_ns: + raise Exception( + "Didn't get to execute, deadline already exceeded" + ) + metric_reader.shutdown( + timeout_millis=(deadline_ns - current_ts) / 10**6 + ) # pylint: disable=broad-except except Exception as error: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index 959006e2347..c8dcb43327e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -31,6 +31,7 @@ from opentelemetry.sdk._metrics.metric_reader import MetricReader from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.util._once import Once +from opentelemetry.util._time import _time_ns _logger = logging.getLogger(__name__) @@ -53,8 +54,11 @@ class MetricExporter(ABC): @abstractmethod def export( - self, metrics: Sequence[Metric], *args, **kwargs - ) -> "MetricExportResult": + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: """Exports a batch of telemetry data. Args: @@ -65,7 +69,7 @@ def export( """ @abstractmethod - def shutdown(self, *args, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: """Shuts down the exporter. Called when the SDK is shut down. @@ -90,14 +94,17 @@ def __init__( self.formatter = formatter def export( - self, metrics: Sequence[Metric], *args, **kwargs + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> MetricExportResult: for metric in metrics: self.out.write(self.formatter(metric)) self.out.flush() return MetricExportResult.SUCCESS - def shutdown(self, *args, **kwargs) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -127,11 +134,16 @@ def get_metrics(self) -> List[Metric]: self._metrics = [] return metrics - def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: with self._lock: self._metrics = list(metrics) - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass @@ -193,23 +205,28 @@ def _at_fork_reinit(self): def _ticker(self) -> None: interval_secs = self._export_interval_millis / 1e3 while not self._shutdown_event.wait(interval_secs): - self.collect() + self.collect(timeout_millis=self._export_timeout_millis) # one last collection below before shutting down completely - self.collect() + self.collect(timeout_millis=self._export_interval_millis) def _receive_metrics( - self, metrics: Iterable[Metric], *args, **kwargs + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> None: if metrics is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: - self._exporter.export(metrics) + self._exporter.export(metrics, timeout_millis=timeout_millis) except Exception as e: # pylint: disable=broad-except,invalid-name _logger.exception("Exception while exporting metrics %s", str(e)) detach(token) - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + deadline_ns = _time_ns() + timeout_millis * 10**6 + def _shutdown(): self._shutdown = True @@ -219,5 +236,5 @@ def _shutdown(): return self._shutdown_event.set() - self._daemon_thread.join() - self._exporter.shutdown() + self._daemon_thread.join(timeout=(deadline_ns - _time_ns()) / 10**9) + self._exporter.shutdown(timeout=(deadline_ns - _time_ns()) / 10**6) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py index afdc7083162..d949a340478 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py @@ -138,7 +138,7 @@ def __init__( self._instrument_class_aggregation.update(preferred_aggregation or {}) @final - def collect(self) -> None: + def collect(self, timeout_millis: float = 10_000) -> None: """Collects the metrics from the internal SDK state and invokes the `_receive_metrics` with the collection. """ @@ -148,7 +148,8 @@ def collect(self) -> None: ) return self._receive_metrics( - self._collect(self, self._instrument_class_temporality) + self._collect(self, self._instrument_class_temporality), + timeout_millis=timeout_millis, ) @final @@ -162,11 +163,16 @@ def _set_collect_callback( self._collect = func @abstractmethod - def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: """Called by `MetricReader.collect` when it receives a batch of metrics""" @abstractmethod - def shutdown(self, *args, **kwargs): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: """Shuts down the MetricReader. This method provides a way for the MetricReader to do any cleanup required. A metric reader can only be shutdown once, any subsequent calls are ignored and return diff --git a/opentelemetry-sdk/tests/metrics/test_backward_compat.py b/opentelemetry-sdk/tests/metrics/test_backward_compat.py new file mode 100644 index 00000000000..4b3b504d65d --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_backward_compat.py @@ -0,0 +1,89 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The purpose of this test is to test for backward compatibility with any user-implementable +interfaces as they were originally defined. For example, changes to the MetricExporter ABC must +be made in such a way that existing implementations (outside of this repo) continue to work +when *called* by the SDK. + +This does not apply to classes which are not intended to be overriden by the user e.g. Meter +and PeriodicExportingMetricReader concrete class. Those may freely be modified in a +backward-compatible way for *callers*. + +Ideally, we could use mypy for this as well, but SDK is not type checked atm. +""" + +from typing import Iterable, Sequence +from unittest import TestCase + +from opentelemetry.sdk._metrics import MeterProvider +from opentelemetry.sdk._metrics.export import ( + MetricExporter, + MetricExportResult, + PeriodicExportingMetricReader, +) +from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics.point import Metric + + +# Do not change these classes until after major version 1 +class OrigMetricExporter(MetricExporter): + def export( + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + pass + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + pass + + +class OrigMetricReader(MetricReader): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + pass + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + self.collect() + + +class TestBackwardCompat(TestCase): + def test_metric_exporter(self): + exporter = OrigMetricExporter() + meter_provider = MeterProvider( + metric_readers=[PeriodicExportingMetricReader(exporter)] + ) + # produce some data + meter_provider.get_meter("foo").create_counter("mycounter").add(12) + try: + meter_provider.shutdown() + except Exception: + self.fail() + + def test_metric_reader(self): + reader = OrigMetricReader() + meter_provider = MeterProvider(metric_readers=[reader]) + # produce some data + meter_provider.get_meter("foo").create_counter("mycounter").add(12) + try: + meter_provider.shutdown() + except Exception: + self.fail() diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py index 48e69ea8006..1b95953cf80 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -13,12 +13,12 @@ # limitations under the License. from os import environ -from typing import Dict +from typing import Dict, Iterable from unittest import TestCase from unittest.mock import patch -from opentelemetry.sdk._metrics._internal.aggregation import Aggregation from opentelemetry.sdk._metrics.aggregation import ( + Aggregation, DefaultAggregation, LastValueAggregation, ) @@ -31,7 +31,7 @@ UpDownCounter, ) from opentelemetry.sdk._metrics.metric_reader import MetricReader -from opentelemetry.sdk._metrics.point import AggregationTemporality +from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.sdk.environment_variables import ( _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, ) @@ -48,10 +48,15 @@ def __init__( preferred_aggregation=preferred_aggregation, ) - def _receive_metrics(self, metrics): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: pass - def shutdown(self): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: return True diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index ca256319354..3b571638016 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -15,7 +15,7 @@ from logging import WARNING from time import sleep -from typing import Sequence +from typing import Iterable, Sequence from unittest import TestCase from unittest.mock import MagicMock, Mock, patch @@ -46,10 +46,15 @@ class DummyMetricReader(MetricReader): def __init__(self): super().__init__() - def _receive_metrics(self, metrics): + def _receive_metrics( + self, + metrics: Iterable[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> None: pass - def shutdown(self): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: return True @@ -433,12 +438,17 @@ def __init__(self): self.metrics = {} self._counter = 0 - def export(self, metrics: Sequence[Metric]) -> MetricExportResult: + def export( + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: self.metrics[self._counter] = metrics self._counter += 1 return MetricExportResult.SUCCESS - def shutdown(self) -> None: + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index ff67e848afe..d2ad8cdccc3 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -13,12 +13,14 @@ # limitations under the License. import time +from typing import Sequence from unittest.mock import Mock from flaky import flaky from opentelemetry.sdk._metrics.export import ( MetricExporter, + MetricExportResult, PeriodicExportingMetricReader, ) from opentelemetry.sdk._metrics.point import Gauge, Metric, Sum @@ -33,12 +35,17 @@ def __init__(self, wait=0): self.metrics = [] self._shutdown = False - def export(self, metrics): + def export( + self, + metrics: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: time.sleep(self.wait) self.metrics.extend(metrics) return True - def shutdown(self): + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._shutdown = True