diff --git a/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py b/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py index 5f8424fd00b..d77dcd4324e 100644 --- a/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py +++ b/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py @@ -147,10 +147,9 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor: def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point: # TODO: horrible hack to get original list of keys to then get the bound # instrument - key = dict(metric_record.labels) point = metrics_pb2.Point( timestamp=utils.proto_timestamp_from_time_ns( - metric_record.metric.bind(key).last_update_timestamp + metric_record.aggregator.last_update_timestamp ) ) if metric_record.metric.value_type == int: diff --git a/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py b/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py index 6133b3bd88b..7dcbf452e1e 100644 --- a/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py +++ b/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py @@ -178,13 +178,11 @@ def test_translate_to_collector(self): self.assertEqual(len(output_metrics[0].timeseries[0].points), 1) self.assertEqual( output_metrics[0].timeseries[0].points[0].timestamp.seconds, - record.metric.bind(self._labels).last_update_timestamp - // 1000000000, + record.aggregator.last_update_timestamp // 1000000000, ) self.assertEqual( output_metrics[0].timeseries[0].points[0].timestamp.nanos, - record.metric.bind(self._labels).last_update_timestamp - % 1000000000, + record.aggregator.last_update_timestamp % 1000000000, ) self.assertEqual( output_metrics[0].timeseries[0].points[0].int64_value, 123 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index e6f264494d9..1d35648fd35 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -21,7 +21,6 @@ from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.util import time_ns logger = logging.getLogger(__name__) @@ -54,7 +53,6 @@ def __init__( self.value_type = value_type self.enabled = enabled self.aggregator = aggregator - self.last_update_timestamp = time_ns() self._ref_count = 0 self._ref_count_lock = threading.Lock() @@ -69,7 +67,6 @@ def _validate_update(self, value: metrics_api.ValueT) -> bool: return True def update(self, value: metrics_api.ValueT): - self.last_update_timestamp = time_ns() self.aggregator.update(value) def release(self): @@ -88,10 +85,8 @@ def ref_count(self): return self._ref_count def __repr__(self): - return '{}(data="{}", last_update_timestamp={})'.format( - type(self).__name__, - self.aggregator.current, - self.last_update_timestamp, + return '{}(data="{}")'.format( + type(self).__name__, self.aggregator.current ) @@ -331,7 +326,6 @@ def _collect_observers(self) -> None: if not observer.enabled: continue - # TODO: capture timestamp? if not observer.run(): continue @@ -404,9 +398,7 @@ def unregister_observer(self, observer: "Observer") -> None: class MeterProvider(metrics_api.MeterProvider): - def __init__( - self, resource: Resource = Resource.create_empty(), - ): + def __init__(self, resource: Resource = Resource.create_empty()): self.resource = resource def get_meter( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 20e5e5c2097..ea8c40a7e72 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -16,6 +16,8 @@ import threading from collections import namedtuple +from opentelemetry.util import time_ns + class Aggregator(abc.ABC): """Base class for aggregators. @@ -49,10 +51,12 @@ def __init__(self): self.current = 0 self.checkpoint = 0 self._lock = threading.Lock() + self.last_update_timestamp = None def update(self, value): with self._lock: self.current += value + self.last_update_timestamp = time_ns() def take_checkpoint(self): with self._lock: @@ -62,6 +66,9 @@ def take_checkpoint(self): def merge(self, other): with self._lock: self.checkpoint += other.checkpoint + self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp, other.last_update_timestamp + ) class MinMaxSumCountAggregator(Aggregator): @@ -88,6 +95,7 @@ def __init__(self): self.current = self._EMPTY self.checkpoint = self._EMPTY self._lock = threading.Lock() + self.last_update_timestamp = None def update(self, value): with self._lock: @@ -100,6 +108,7 @@ def update(self, value): self.current.sum + value, self.current.count + 1, ) + self.last_update_timestamp = time_ns() def take_checkpoint(self): with self._lock: @@ -111,6 +120,9 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) + self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp, other.last_update_timestamp + ) class ObserverAggregator(Aggregator): @@ -123,10 +135,12 @@ def __init__(self): self.mmsc = MinMaxSumCountAggregator() self.current = None self.checkpoint = self._TYPE(None, None, None, 0, None) + self.last_update_timestamp = None def update(self, value): self.mmsc.update(value) self.current = value + self.last_update_timestamp = time_ns() def take_checkpoint(self): self.mmsc.take_checkpoint() @@ -134,9 +148,19 @@ def take_checkpoint(self): def merge(self, other): self.mmsc.merge(other.mmsc) - self.checkpoint = self._TYPE( - *( - self.mmsc.checkpoint - + (other.checkpoint.last or self.checkpoint.last,) - ) + last = self.checkpoint.last + self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp, other.last_update_timestamp ) + if self.last_update_timestamp == other.last_update_timestamp: + last = other.checkpoint.last + self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,))) + + +def get_latest_timestamp(time_stamp, other_timestamp): + if time_stamp is None: + return other_timestamp + if other_timestamp is not None: + if time_stamp < other_timestamp: + return other_timestamp + return time_stamp diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 69cdcf6e160..3cf305eb7e3 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -228,11 +228,14 @@ def call_update(counter): update_total += val return update_total - def test_update(self): + @mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def test_update(self, time_mock): + time_mock.return_value = 123 counter = CounterAggregator() counter.update(1.0) counter.update(2.0) self.assertEqual(counter.current, 3.0) + self.assertEqual(counter.last_update_timestamp, 123) def test_checkpoint(self): counter = CounterAggregator() @@ -246,8 +249,10 @@ def test_merge(self): counter2 = CounterAggregator() counter.checkpoint = 1.0 counter2.checkpoint = 3.0 + counter2.last_update_timestamp = 123 counter.merge(counter2) self.assertEqual(counter.checkpoint, 4.0) + self.assertEqual(counter.last_update_timestamp, 123) def test_concurrent_update(self): counter = CounterAggregator() @@ -296,7 +301,9 @@ def call_update(mmsc): count_ += 1 return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_) - def test_update(self): + @mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def test_update(self, time_mock): + time_mock.return_value = 123 mmsc = MinMaxSumCountAggregator() # test current values without any update self.assertEqual(mmsc.current, MinMaxSumCountAggregator._EMPTY) @@ -309,6 +316,7 @@ def test_update(self): self.assertEqual( mmsc.current, (min(values), max(values), sum(values), len(values)) ) + self.assertEqual(mmsc.last_update_timestamp, 123) def test_checkpoint(self): mmsc = MinMaxSumCountAggregator() @@ -340,6 +348,9 @@ def test_merge(self): mmsc1.checkpoint = checkpoint1 mmsc2.checkpoint = checkpoint2 + mmsc1.last_update_timestamp = 100 + mmsc2.last_update_timestamp = 123 + mmsc1.merge(mmsc2) self.assertEqual( @@ -348,6 +359,7 @@ def test_merge(self): checkpoint1, checkpoint2 ), ) + self.assertEqual(mmsc1.last_update_timestamp, 123) def test_merge_checkpoint(self): func = MinMaxSumCountAggregator._merge_checkpoint @@ -421,7 +433,9 @@ def test_concurrent_update_and_checkpoint(self): class TestObserverAggregator(unittest.TestCase): - def test_update(self): + @mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def test_update(self, time_mock): + time_mock.return_value = 123 observer = ObserverAggregator() # test current values without any update self.assertEqual(observer.mmsc.current, (None, None, None, 0)) @@ -436,6 +450,7 @@ def test_update(self): observer.mmsc.current, (min(values), max(values), sum(values), len(values)), ) + self.assertEqual(observer.last_update_timestamp, 123) self.assertEqual(observer.current, values[-1]) @@ -471,6 +486,77 @@ def test_merge(self): observer1.mmsc.checkpoint = mmsc_checkpoint1 observer2.mmsc.checkpoint = mmsc_checkpoint2 + observer1.last_update_timestamp = 100 + observer2.last_update_timestamp = 123 + + observer1.checkpoint = checkpoint1 + observer2.checkpoint = checkpoint2 + + observer1.merge(observer2) + + self.assertEqual( + observer1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + checkpoint2.last, + ), + ) + self.assertEqual(observer1.last_update_timestamp, 123) + + def test_merge_last_updated(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer2.mmsc.checkpoint = mmsc_checkpoint2 + + observer1.last_update_timestamp = 123 + observer2.last_update_timestamp = 100 + + observer1.checkpoint = checkpoint1 + observer2.checkpoint = checkpoint2 + + observer1.merge(observer2) + + self.assertEqual( + observer1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + checkpoint1.last, + ), + ) + self.assertEqual(observer1.last_update_timestamp, 123) + + def test_merge_last_updated_none(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer2.mmsc.checkpoint = mmsc_checkpoint2 + + observer1.last_update_timestamp = None + observer2.last_update_timestamp = 100 + observer1.checkpoint = checkpoint1 observer2.checkpoint = checkpoint2 @@ -486,6 +572,7 @@ def test_merge(self): checkpoint2.last, ), ) + self.assertEqual(observer1.last_update_timestamp, 100) def test_merge_with_empty(self): observer1 = ObserverAggregator() @@ -496,6 +583,7 @@ def test_merge_with_empty(self): observer1.mmsc.checkpoint = mmsc_checkpoint1 observer1.checkpoint = checkpoint1 + observer1.last_update_timestamp = 100 observer1.merge(observer2) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 1bbe18b178b..32980647055 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -368,13 +368,10 @@ def test_add_incorrect_type(self, logger_mock): self.assertEqual(bound_counter.aggregator.current, 0) self.assertTrue(logger_mock.warning.called) - @mock.patch("opentelemetry.sdk.metrics.time_ns") - def test_update(self, time_mock): + def test_update(self): aggregator = export.aggregate.CounterAggregator() bound_counter = metrics.BoundCounter(int, True, aggregator) - time_mock.return_value = 123 bound_counter.update(4.0) - self.assertEqual(bound_counter.last_update_timestamp, 123) self.assertEqual(bound_counter.aggregator.current, 4.0) @@ -403,11 +400,8 @@ def test_record_incorrect_type(self, logger_mock): ) self.assertTrue(logger_mock.warning.called) - @mock.patch("opentelemetry.sdk.metrics.time_ns") - def test_update(self, time_mock): + def test_update(self): aggregator = export.aggregate.MinMaxSumCountAggregator() bound_measure = metrics.BoundMeasure(int, True, aggregator) - time_mock.return_value = 123 bound_measure.update(4.0) - self.assertEqual(bound_measure.last_update_timestamp, 123) self.assertEqual(bound_measure.aggregator.current, (4.0, 4.0, 4.0, 1))