From 1f12c96efe5472d0fdf9ff2bde7b0e39ddb39baf Mon Sep 17 00:00:00 2001 From: Leighton Date: Mon, 23 Mar 2020 09:59:53 -0700 Subject: [PATCH 01/14] timestamp --- .../src/opentelemetry/sdk/metrics/__init__.py | 4 --- .../sdk/metrics/export/aggregate.py | 15 ++++++++++- .../tests/metrics/export/test_export.py | 25 ++++++++++++++++--- .../tests/metrics/test_metrics.py | 10 ++------ 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 55552ace7b1..94c638f01b8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -21,7 +21,6 @@ from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.util import time_ns logger = logging.getLogger(__name__) @@ -71,7 +70,6 @@ def __init__( self.value_type = value_type self.enabled = enabled self.aggregator = aggregator - self.last_update_timestamp = time_ns() self._ref_count = 0 self._ref_count_lock = threading.Lock() @@ -86,7 +84,6 @@ def _validate_update(self, value: metrics_api.ValueT) -> bool: return True def update(self, value: metrics_api.ValueT): - self.last_update_timestamp = time_ns() self.aggregator.update(value) def release(self): @@ -346,7 +343,6 @@ def _collect_observers(self) -> None: if not observer.enabled: continue - # TODO: capture timestamp? if not observer.run(): continue diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 5b730cc8040..1db3db1e1fb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -15,6 +15,7 @@ import abc import threading from collections import namedtuple +from opentelemetry.util import time_ns class Aggregator(abc.ABC): @@ -49,10 +50,12 @@ def __init__(self): self.current = 0 self.checkpoint = 0 self._lock = threading.Lock() + self.last_update_timestamp = None def update(self, value): with self._lock: self.current += value + self.last_update_timestamp = time_ns() def take_checkpoint(self): with self._lock: @@ -62,6 +65,8 @@ def take_checkpoint(self): def merge(self, other): with self._lock: self.checkpoint += other.checkpoint + self.last_update_timestamp = other.last_update_timestamp \ + or self.last_update_timestamp class MinMaxSumCountAggregator(Aggregator): @@ -88,6 +93,7 @@ def __init__(self): self.current = self._EMPTY self.checkpoint = self._EMPTY self._lock = threading.Lock() + self.last_update_timestamp = None def update(self, value): with self._lock: @@ -100,6 +106,7 @@ def update(self, value): self.current.sum + value, self.current.count + 1, ) + self.last_update_timestamp = time_ns() def take_checkpoint(self): with self._lock: @@ -111,6 +118,8 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) + self.last_update_timestamp = other.last_update_timestamp \ + or self.last_update_timestamp class ObserverAggregator(Aggregator): @@ -123,10 +132,12 @@ def __init__(self): self.mmsc = MinMaxSumCountAggregator() self.current = None self.checkpoint = self._TYPE(None, None, None, 0, None) + self.last_update_timestamp = None def update(self, value): self.mmsc.update(value) self.current = value + self.last_update_timestamp = time_ns() def take_checkpoint(self): self.mmsc.take_checkpoint() @@ -137,6 +148,8 @@ def merge(self, other): self.checkpoint = self._TYPE( *( self.mmsc.checkpoint - + (other.checkpoint.last or self.checkpoint.last,) + + (other.checkpoint.last or self.checkpoint.last or 0,) ) ) + self.last_update_timestamp = other.last_update_timestamp \ + or self.last_update_timestamp diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 33d8d07c299..1aab2296ef1 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -17,6 +17,7 @@ import unittest from unittest import mock +from opentelemetry.util import time_ns from opentelemetry.sdk import metrics from opentelemetry.sdk.metrics.export import ( ConsoleMetricsExporter, @@ -234,11 +235,14 @@ def call_update(counter): update_total += val return update_total - def test_update(self): + @mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def test_update(self, time_mock): + time_mock.return_value = 123 counter = CounterAggregator() counter.update(1.0) counter.update(2.0) self.assertEqual(counter.current, 3.0) + self.assertEqual(counter.last_update_timestamp, 123) def test_checkpoint(self): counter = CounterAggregator() @@ -252,8 +256,10 @@ def test_merge(self): counter2 = CounterAggregator() counter.checkpoint = 1.0 counter2.checkpoint = 3.0 + counter2.last_update_timestamp = 123 counter.merge(counter2) self.assertEqual(counter.checkpoint, 4.0) + self.assertEqual(counter.last_update_timestamp, 123) def test_concurrent_update(self): counter = CounterAggregator() @@ -302,7 +308,9 @@ def call_update(mmsc): count_ += 1 return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_) - def test_update(self): + @mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def test_update(self, time_mock): + time_mock.return_value = 123 mmsc = MinMaxSumCountAggregator() # test current values without any update self.assertEqual(mmsc.current, MinMaxSumCountAggregator._EMPTY) @@ -315,6 +323,7 @@ def test_update(self): self.assertEqual( mmsc.current, (min(values), max(values), sum(values), len(values)) ) + self.assertEqual(mmsc.last_update_timestamp, 123) def test_checkpoint(self): mmsc = MinMaxSumCountAggregator() @@ -346,6 +355,8 @@ def test_merge(self): mmsc1.checkpoint = checkpoint1 mmsc2.checkpoint = checkpoint2 + mmsc2.last_update_timestamp = 123 + mmsc1.merge(mmsc2) self.assertEqual( @@ -354,6 +365,7 @@ def test_merge(self): checkpoint1, checkpoint2 ), ) + self.assertEqual(mmsc1.last_update_timestamp, 123) def test_merge_checkpoint(self): func = MinMaxSumCountAggregator._merge_checkpoint @@ -427,7 +439,10 @@ def test_concurrent_update_and_checkpoint(self): class TestObserverAggregator(unittest.TestCase): - def test_update(self): + + @mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") + def test_update(self, time_mock): + time_mock.return_value = 123 observer = ObserverAggregator() # test current values without any update self.assertEqual(observer.mmsc.current, (None, None, None, 0)) @@ -442,6 +457,7 @@ def test_update(self): observer.mmsc.current, (min(values), max(values), sum(values), len(values)), ) + self.assertEqual(observer.last_update_timestamp, 123) self.assertEqual(observer.current, values[-1]) @@ -477,6 +493,8 @@ def test_merge(self): observer1.mmsc.checkpoint = mmsc_checkpoint1 observer2.mmsc.checkpoint = mmsc_checkpoint2 + observer2.last_update_timestamp = 123 + observer1.checkpoint = checkpoint1 observer2.checkpoint = checkpoint2 @@ -492,6 +510,7 @@ def test_merge(self): checkpoint2.last, ), ) + self.assertEqual(observer1.last_update_timestamp, 123) def test_merge_with_empty(self): observer1 = ObserverAggregator() diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index dc09091c356..4a880bef8a0 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -393,13 +393,10 @@ def test_add_incorrect_type(self, logger_mock): self.assertEqual(bound_counter.aggregator.current, 0) self.assertTrue(logger_mock.warning.called) - @mock.patch("opentelemetry.sdk.metrics.time_ns") - def test_update(self, time_mock): + def test_update(self): aggregator = export.aggregate.CounterAggregator() bound_counter = metrics.BoundCounter(int, True, aggregator) - time_mock.return_value = 123 bound_counter.update(4.0) - self.assertEqual(bound_counter.last_update_timestamp, 123) self.assertEqual(bound_counter.aggregator.current, 4.0) @@ -428,11 +425,8 @@ def test_record_incorrect_type(self, logger_mock): ) self.assertTrue(logger_mock.warning.called) - @mock.patch("opentelemetry.sdk.metrics.time_ns") - def test_update(self, time_mock): + def test_update(self): aggregator = export.aggregate.MinMaxSumCountAggregator() bound_measure = metrics.BoundMeasure(int, True, aggregator) - time_mock.return_value = 123 bound_measure.update(4.0) - self.assertEqual(bound_measure.last_update_timestamp, 123) self.assertEqual(bound_measure.aggregator.current, (4.0, 4.0, 4.0, 1)) From 22d9095dcf33d3e4e141cd42d9c01c19ea18c071 Mon Sep 17 00:00:00 2001 From: Leighton Date: Mon, 23 Mar 2020 10:44:21 -0700 Subject: [PATCH 02/14] fix tests --- .../ext/otcollector/metrics_exporter/__init__.py | 4 +--- .../tests/test_otcollector_metrics_exporter.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py b/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py index 12715035c25..4be205352a3 100644 --- a/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py +++ b/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py @@ -147,9 +147,7 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor: def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point: point = metrics_pb2.Point( timestamp=utils.proto_timestamp_from_time_ns( - metric_record.metric.bind( - metric_record.label_set - ).last_update_timestamp + metric_record.aggregator.last_update_timestamp ) ) if metric_record.metric.value_type == int: diff --git a/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py b/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py index ab6f4c8ccd5..929ae21bc66 100644 --- a/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py +++ b/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py @@ -175,12 +175,12 @@ def test_translate_to_collector(self): self.assertEqual(len(output_metrics[0].timeseries[0].points), 1) self.assertEqual( output_metrics[0].timeseries[0].points[0].timestamp.seconds, - record.metric.bind(record.label_set).last_update_timestamp + record.aggregator.last_update_timestamp // 1000000000, ) self.assertEqual( output_metrics[0].timeseries[0].points[0].timestamp.nanos, - record.metric.bind(record.label_set).last_update_timestamp + record.aggregator.last_update_timestamp % 1000000000, ) self.assertEqual( From 32eb662b9a6b6934857ad7a7672d804df249e3e3 Mon Sep 17 00:00:00 2001 From: Leighton Date: Mon, 23 Mar 2020 11:01:11 -0700 Subject: [PATCH 03/14] lint --- .../tests/test_otcollector_metrics_exporter.py | 6 ++---- .../src/opentelemetry/sdk/metrics/__init__.py | 4 +--- .../sdk/metrics/export/aggregate.py | 16 ++++++++++------ .../tests/metrics/export/test_export.py | 3 +-- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py b/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py index 929ae21bc66..491f1364ae0 100644 --- a/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py +++ b/ext/opentelemetry-ext-otcollector/tests/test_otcollector_metrics_exporter.py @@ -175,13 +175,11 @@ def test_translate_to_collector(self): self.assertEqual(len(output_metrics[0].timeseries[0].points), 1) self.assertEqual( output_metrics[0].timeseries[0].points[0].timestamp.seconds, - record.aggregator.last_update_timestamp - // 1000000000, + record.aggregator.last_update_timestamp // 1000000000, ) self.assertEqual( output_metrics[0].timeseries[0].points[0].timestamp.nanos, - record.aggregator.last_update_timestamp - % 1000000000, + record.aggregator.last_update_timestamp % 1000000000, ) self.assertEqual( output_metrics[0].timeseries[0].points[0].int64_value, 123 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 94c638f01b8..83e6b79ba21 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -425,9 +425,7 @@ def get_label_set(self, labels: Dict[str, str]): class MeterProvider(metrics_api.MeterProvider): - def __init__( - self, resource: Resource = Resource.create_empty(), - ): + def __init__(self, resource: Resource = Resource.create_empty()): self.resource = resource def get_meter( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 1db3db1e1fb..fff64d6813a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -15,6 +15,7 @@ import abc import threading from collections import namedtuple + from opentelemetry.util import time_ns @@ -65,8 +66,9 @@ def take_checkpoint(self): def merge(self, other): with self._lock: self.checkpoint += other.checkpoint - self.last_update_timestamp = other.last_update_timestamp \ - or self.last_update_timestamp + self.last_update_timestamp = ( + other.last_update_timestamp or self.last_update_timestamp + ) class MinMaxSumCountAggregator(Aggregator): @@ -118,8 +120,9 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) - self.last_update_timestamp = other.last_update_timestamp \ - or self.last_update_timestamp + self.last_update_timestamp = ( + other.last_update_timestamp or self.last_update_timestamp + ) class ObserverAggregator(Aggregator): @@ -151,5 +154,6 @@ def merge(self, other): + (other.checkpoint.last or self.checkpoint.last or 0,) ) ) - self.last_update_timestamp = other.last_update_timestamp \ - or self.last_update_timestamp + self.last_update_timestamp = ( + other.last_update_timestamp or self.last_update_timestamp + ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 1aab2296ef1..93fcd824e80 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -17,7 +17,6 @@ import unittest from unittest import mock -from opentelemetry.util import time_ns from opentelemetry.sdk import metrics from opentelemetry.sdk.metrics.export import ( ConsoleMetricsExporter, @@ -30,6 +29,7 @@ ) from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.util import time_ns # pylint: disable=protected-access @@ -439,7 +439,6 @@ def test_concurrent_update_and_checkpoint(self): class TestObserverAggregator(unittest.TestCase): - @mock.patch("opentelemetry.sdk.metrics.export.aggregate.time_ns") def test_update(self, time_mock): time_mock.return_value = 123 From 2aae43fb2637046f73e2201669506daa70a85d7a Mon Sep 17 00:00:00 2001 From: Leighton Date: Mon, 23 Mar 2020 11:22:43 -0700 Subject: [PATCH 04/14] fix lint --- opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py | 3 +-- opentelemetry-sdk/tests/metrics/export/test_export.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 83e6b79ba21..fdf43ef4dfa 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -102,10 +102,9 @@ def ref_count(self): return self._ref_count def __repr__(self): - return '{}(data="{}", last_update_timestamp={})'.format( + return '{}(data="{}")'.format( type(self).__name__, self.aggregator.current, - self.last_update_timestamp, ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 93fcd824e80..43b3fa7e084 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -29,7 +29,6 @@ ) from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.metrics.export.controller import PushController -from opentelemetry.util import time_ns # pylint: disable=protected-access From e23fbde7e38c4dad479cf09984d2275bb86174f8 Mon Sep 17 00:00:00 2001 From: Leighton Date: Mon, 23 Mar 2020 11:37:38 -0700 Subject: [PATCH 05/14] lint --- opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index fdf43ef4dfa..9fde587cd9b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -103,8 +103,7 @@ def ref_count(self): def __repr__(self): return '{}(data="{}")'.format( - type(self).__name__, - self.aggregator.current, + type(self).__name__, self.aggregator.current ) From 1d2156262420cbaa306ecb0f17a210430e4bb401 Mon Sep 17 00:00:00 2001 From: Leighton Date: Tue, 24 Mar 2020 10:30:20 -0700 Subject: [PATCH 06/14] address coemmnts --- .../sdk/metrics/export/aggregate.py | 32 +++++++++++++------ .../tests/metrics/export/test_export.py | 3 ++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index fff64d6813a..26376fb5c76 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -66,9 +66,12 @@ def take_checkpoint(self): def merge(self, other): with self._lock: self.checkpoint += other.checkpoint - self.last_update_timestamp = ( - other.last_update_timestamp or self.last_update_timestamp - ) + if (other.last_update_timestamp is not None and + self.last_update_timestamp is not None): + if self.last_update_timestamp < other.last_update_timestamp: + self.last_update_timestamp = other.last_update_timestamp + elif self.last_update_timestamp is None: + self.last_update_timestamp = other.last_update_timestamp class MinMaxSumCountAggregator(Aggregator): @@ -120,9 +123,12 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) - self.last_update_timestamp = ( - other.last_update_timestamp or self.last_update_timestamp - ) + if (other.last_update_timestamp is not None and + self.last_update_timestamp is not None): + if self.last_update_timestamp < other.last_update_timestamp: + self.last_update_timestamp = other.last_update_timestamp + elif self.last_update_timestamp is None: + self.last_update_timestamp = other.last_update_timestamp class ObserverAggregator(Aggregator): @@ -148,12 +154,18 @@ def take_checkpoint(self): def merge(self, other): self.mmsc.merge(other.mmsc) + last = self.checkpoint.last + if (other.last_update_timestamp is not None and + self.last_update_timestamp is not None): + if self.last_update_timestamp < other.last_update_timestamp: + self.last_update_timestamp = other.last_update_timestamp + last = other.checkpoint.last + elif self.last_update_timestamp is None: + self.last_update_timestamp = other.last_update_timestamp + last = other.checkpoint.last self.checkpoint = self._TYPE( *( self.mmsc.checkpoint - + (other.checkpoint.last or self.checkpoint.last or 0,) + + (last,) ) ) - self.last_update_timestamp = ( - other.last_update_timestamp or self.last_update_timestamp - ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 43b3fa7e084..c48d2831377 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -354,6 +354,7 @@ def test_merge(self): mmsc1.checkpoint = checkpoint1 mmsc2.checkpoint = checkpoint2 + mmsc1.last_update_timestamp = 100 mmsc2.last_update_timestamp = 123 mmsc1.merge(mmsc2) @@ -491,6 +492,7 @@ def test_merge(self): observer1.mmsc.checkpoint = mmsc_checkpoint1 observer2.mmsc.checkpoint = mmsc_checkpoint2 + observer1.last_update_timestamp = 100 observer2.last_update_timestamp = 123 observer1.checkpoint = checkpoint1 @@ -519,6 +521,7 @@ def test_merge_with_empty(self): observer1.mmsc.checkpoint = mmsc_checkpoint1 observer1.checkpoint = checkpoint1 + observer1.last_update_timestamp = 100 observer1.merge(observer2) From e84d2124971dd7b3a82234b2d0f41095f5fdddd2 Mon Sep 17 00:00:00 2001 From: Leighton Date: Tue, 24 Mar 2020 10:49:43 -0700 Subject: [PATCH 07/14] lint --- .../sdk/metrics/export/aggregate.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 26376fb5c76..b90f1dc8fcf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -66,8 +66,10 @@ def take_checkpoint(self): def merge(self, other): with self._lock: self.checkpoint += other.checkpoint - if (other.last_update_timestamp is not None and - self.last_update_timestamp is not None): + if ( + other.last_update_timestamp is not None + and self.last_update_timestamp is not None + ): if self.last_update_timestamp < other.last_update_timestamp: self.last_update_timestamp = other.last_update_timestamp elif self.last_update_timestamp is None: @@ -123,8 +125,10 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) - if (other.last_update_timestamp is not None and - self.last_update_timestamp is not None): + if ( + other.last_update_timestamp is not None + and self.last_update_timestamp is not None + ): if self.last_update_timestamp < other.last_update_timestamp: self.last_update_timestamp = other.last_update_timestamp elif self.last_update_timestamp is None: @@ -155,17 +159,14 @@ def take_checkpoint(self): def merge(self, other): self.mmsc.merge(other.mmsc) last = self.checkpoint.last - if (other.last_update_timestamp is not None and - self.last_update_timestamp is not None): + if ( + other.last_update_timestamp is not None + and self.last_update_timestamp is not None + ): if self.last_update_timestamp < other.last_update_timestamp: self.last_update_timestamp = other.last_update_timestamp last = other.checkpoint.last elif self.last_update_timestamp is None: self.last_update_timestamp = other.last_update_timestamp last = other.checkpoint.last - self.checkpoint = self._TYPE( - *( - self.mmsc.checkpoint - + (last,) - ) - ) + self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,))) From d7a3deb62ed7582ecc814efa4f528f81d9824eed Mon Sep 17 00:00:00 2001 From: Leighton Date: Wed, 25 Mar 2020 10:55:15 -0700 Subject: [PATCH 08/14] Addres comments --- .../sdk/metrics/export/aggregate.py | 29 +++----- .../tests/metrics/export/test_export.py | 68 +++++++++++++++++++ 2 files changed, 78 insertions(+), 19 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index b90f1dc8fcf..7d68a240202 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -66,14 +66,11 @@ def take_checkpoint(self): def merge(self, other): with self._lock: self.checkpoint += other.checkpoint - if ( - other.last_update_timestamp is not None - and self.last_update_timestamp is not None - ): + if self.last_update_timestamp is None: + self.last_update_timestamp = other.last_update_timestamp + elif other.last_update_timestamp is not None: if self.last_update_timestamp < other.last_update_timestamp: self.last_update_timestamp = other.last_update_timestamp - elif self.last_update_timestamp is None: - self.last_update_timestamp = other.last_update_timestamp class MinMaxSumCountAggregator(Aggregator): @@ -125,14 +122,11 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) - if ( - other.last_update_timestamp is not None - and self.last_update_timestamp is not None - ): + if self.last_update_timestamp is None: + self.last_update_timestamp = other.last_update_timestamp + elif other.last_update_timestamp is not None: if self.last_update_timestamp < other.last_update_timestamp: self.last_update_timestamp = other.last_update_timestamp - elif self.last_update_timestamp is None: - self.last_update_timestamp = other.last_update_timestamp class ObserverAggregator(Aggregator): @@ -159,14 +153,11 @@ def take_checkpoint(self): def merge(self, other): self.mmsc.merge(other.mmsc) last = self.checkpoint.last - if ( - other.last_update_timestamp is not None - and self.last_update_timestamp is not None - ): + if self.last_update_timestamp is None: + self.last_update_timestamp = other.last_update_timestamp + last = other.checkpoint.last + elif other.last_update_timestamp is not None: if self.last_update_timestamp < other.last_update_timestamp: self.last_update_timestamp = other.last_update_timestamp last = other.checkpoint.last - elif self.last_update_timestamp is None: - self.last_update_timestamp = other.last_update_timestamp - last = other.checkpoint.last self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,))) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index c48d2831377..3f9bf9ac5ed 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -512,6 +512,74 @@ def test_merge(self): ) self.assertEqual(observer1.last_update_timestamp, 123) + def test_merge_last_updated(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer2.mmsc.checkpoint = mmsc_checkpoint2 + + observer1.last_update_timestamp = 123 + observer2.last_update_timestamp = 100 + + observer1.checkpoint = checkpoint1 + observer2.checkpoint = checkpoint2 + + observer1.merge(observer2) + + self.assertEqual( + observer1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + checkpoint1.last, + ), + ) + self.assertEqual(observer1.last_update_timestamp, 123) + + def test_merge_last_updated_None(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer2.mmsc.checkpoint = mmsc_checkpoint2 + + observer1.last_update_timestamp = None + observer2.last_update_timestamp = 100 + + observer1.checkpoint = checkpoint1 + observer2.checkpoint = checkpoint2 + + observer1.merge(observer2) + + self.assertEqual( + observer1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + checkpoint2.last, + ), + ) + self.assertEqual(observer1.last_update_timestamp, 100) + def test_merge_with_empty(self): observer1 = ObserverAggregator() observer2 = ObserverAggregator() From 610354ae139322ddd842a9a5cc25d14a07b4d5c1 Mon Sep 17 00:00:00 2001 From: Leighton Date: Wed, 25 Mar 2020 13:59:10 -0700 Subject: [PATCH 09/14] fix lint --- opentelemetry-sdk/tests/metrics/export/test_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 3f9bf9ac5ed..c2f4105f4c4 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -546,7 +546,7 @@ def test_merge_last_updated(self): ) self.assertEqual(observer1.last_update_timestamp, 123) - def test_merge_last_updated_None(self): + def test_merge_last_updated_none(self): observer1 = ObserverAggregator() observer2 = ObserverAggregator() From 375d883ea4144802db843d8e1fd05e9612ec7b62 Mon Sep 17 00:00:00 2001 From: Leighton Date: Wed, 1 Apr 2020 18:09:43 -0700 Subject: [PATCH 10/14] factor out check timestamp --- .../otcollector/metrics_exporter/__init__.py | 1 - .../sdk/metrics/export/aggregate.py | 36 ++++++++++--------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py b/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py index c437025f40a..d77dcd4324e 100644 --- a/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py +++ b/ext/opentelemetry-ext-otcollector/src/opentelemetry/ext/otcollector/metrics_exporter/__init__.py @@ -147,7 +147,6 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor: def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point: # TODO: horrible hack to get original list of keys to then get the bound # instrument - key = dict(metric_record.labels) point = metrics_pb2.Point( timestamp=utils.proto_timestamp_from_time_ns( metric_record.aggregator.last_update_timestamp diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index d88b27102b8..0f3acb969fd 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -66,11 +66,9 @@ def take_checkpoint(self): def merge(self, other): with self._lock: self.checkpoint += other.checkpoint - if self.last_update_timestamp is None: - self.last_update_timestamp = other.last_update_timestamp - elif other.last_update_timestamp is not None: - if self.last_update_timestamp < other.last_update_timestamp: - self.last_update_timestamp = other.last_update_timestamp + self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp, other.last_update_timestamp + ) class MinMaxSumCountAggregator(Aggregator): @@ -122,11 +120,9 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) - if self.last_update_timestamp is None: - self.last_update_timestamp = other.last_update_timestamp - elif other.last_update_timestamp is not None: - if self.last_update_timestamp < other.last_update_timestamp: - self.last_update_timestamp = other.last_update_timestamp + self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp, other.last_update_timestamp + ) class ObserverAggregator(Aggregator): @@ -153,11 +149,19 @@ def take_checkpoint(self): def merge(self, other): self.mmsc.merge(other.mmsc) last = self.checkpoint.last - if self.last_update_timestamp is None: - self.last_update_timestamp = other.last_update_timestamp + self.last_update_timestamp = get_latest_timestamp( + self.last_update_timestamp, other.last_update_timestamp + ) + if self.last_update_timestamp == other.last_update_timestamp: last = other.checkpoint.last - elif other.last_update_timestamp is not None: - if self.last_update_timestamp < other.last_update_timestamp: - self.last_update_timestamp = other.last_update_timestamp - last = other.checkpoint.last self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,))) + + +def get_latest_timestamp(time_stamp, other_timestamp): + if time_stamp is None: + return other_timestamp + elif other_timestamp is not None: + if time_stamp < other_timestamp: + return other_timestamp + return time_stamp + \ No newline at end of file From ff12046e0eb022be091a26224ceabb47e6107c19 Mon Sep 17 00:00:00 2001 From: Leighton Date: Wed, 1 Apr 2020 18:25:45 -0700 Subject: [PATCH 11/14] black --- .../src/opentelemetry/sdk/metrics/export/aggregate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 0f3acb969fd..79dd24e0959 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -164,4 +164,3 @@ def get_latest_timestamp(time_stamp, other_timestamp): if time_stamp < other_timestamp: return other_timestamp return time_stamp - \ No newline at end of file From 96113a4e584dc6601ba39262cc2480090a7c6aa1 Mon Sep 17 00:00:00 2001 From: Leighton Date: Wed, 1 Apr 2020 20:31:51 -0700 Subject: [PATCH 12/14] lint --- .../src/opentelemetry/sdk/metrics/export/aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 79dd24e0959..ea8c40a7e72 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -160,7 +160,7 @@ def merge(self, other): def get_latest_timestamp(time_stamp, other_timestamp): if time_stamp is None: return other_timestamp - elif other_timestamp is not None: + if other_timestamp is not None: if time_stamp < other_timestamp: return other_timestamp return time_stamp From 3fa90f47bc400a08d9a52ad1d4b8f49d578086ef Mon Sep 17 00:00:00 2001 From: Leighton Date: Wed, 1 Apr 2020 21:14:26 -0700 Subject: [PATCH 13/14] build --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 06a54bc83d7..417201888b2 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ envlist = ; opentelemetry-api py3{4,5,6,7,8}-test-api pypy3-test-api + ; opentelemetry-sdk py3{4,5,6,7,8}-test-sdk From 01d733daa39197320f8e3efb6c94d6b868e1406a Mon Sep 17 00:00:00 2001 From: Leighton Date: Thu, 2 Apr 2020 08:32:56 -0700 Subject: [PATCH 14/14] build --- tox.ini | 1 - 1 file changed, 1 deletion(-) diff --git a/tox.ini b/tox.ini index 417201888b2..06a54bc83d7 100644 --- a/tox.ini +++ b/tox.ini @@ -7,7 +7,6 @@ envlist = ; opentelemetry-api py3{4,5,6,7,8}-test-api pypy3-test-api - ; opentelemetry-sdk py3{4,5,6,7,8}-test-sdk