diff --git a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md index 0e9fbba8a83..b1deb74e27f 100644 --- a/exporter/opentelemetry-exporter-otlp/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-otlp/CHANGELOG.md @@ -4,6 +4,8 @@ - Add Env variables in OTLP exporter ([#1101](https://github.com/open-telemetry/opentelemetry-python/pull/1101)) +- Do not use bound instruments in OTLP exporter + ([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237)) ## Version 0.14b0 diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py index 18ce772ea41..3a7ad586c6f 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py @@ -58,48 +58,54 @@ MetricsExporter, MetricsExportResult, ) +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) logger = logging.getLogger(__name__) DataPointT = TypeVar("DataPointT", IntDataPoint, DoubleDataPoint) def _get_data_points( - sdk_metric: MetricRecord, data_point_class: Type[DataPointT] + sdk_metric_record: MetricRecord, data_point_class: Type[DataPointT] ) -> List[DataPointT]: - data_points = [] - - for ( - label, - bound_counter, - ) in sdk_metric.instrument.bound_instruments.items(): - - string_key_values = [] - - for label_key, label_value in label: - string_key_values.append( - StringKeyValue(key=label_key, value=label_value) - ) - - for view_data in bound_counter.view_datas: - - if view_data.labels == label: - - data_points.append( - data_point_class( - labels=string_key_values, - value=view_data.aggregator.current, - start_time_unix_nano=( - view_data.aggregator.last_checkpoint_timestamp - ), - time_unix_nano=( - view_data.aggregator.last_update_timestamp - ), - ) - ) - break - - return data_points + if isinstance(sdk_metric_record.aggregator, SumAggregator): + value = sdk_metric_record.aggregator.checkpoint + + elif isinstance(sdk_metric_record.aggregator, MinMaxSumCountAggregator): + # FIXME: How are values to be interpreted from this aggregator? + raise Exception("MinMaxSumCount aggregator data not supported") + + elif isinstance(sdk_metric_record.aggregator, HistogramAggregator): + # FIXME: How are values to be interpreted from this aggregator? + raise Exception("Histogram aggregator data not supported") + + elif isinstance(sdk_metric_record.aggregator, LastValueAggregator): + value = sdk_metric_record.aggregator.checkpoint + + elif isinstance(sdk_metric_record.aggregator, ValueObserverAggregator): + value = sdk_metric_record.aggregator.checkpoint.last + + return [ + data_point_class( + labels=[ + StringKeyValue(key=str(label_key), value=str(label_value)) + for label_key, label_value in sdk_metric_record.labels + ], + value=value, + start_time_unix_nano=( + sdk_metric_record.aggregator.initial_checkpoint_timestamp + ), + time_unix_nano=( + sdk_metric_record.aggregator.last_update_timestamp + ), + ) + ] class OTLPMetricsExporter( @@ -179,13 +185,13 @@ def _translate_data( # SumObserver Sum(aggregation_temporality=cumulative;is_monotonic=true) # UpDownSumObserver Sum(aggregation_temporality=cumulative;is_monotonic=false) # ValueObserver Gauge() - for sdk_metric in data: + for sdk_metric_record in data: - if sdk_metric.resource not in ( + if sdk_metric_record.resource not in ( sdk_resource_instrumentation_library_metrics.keys() ): sdk_resource_instrumentation_library_metrics[ - sdk_metric.resource + sdk_metric_record.resource ] = InstrumentationLibraryMetrics() type_class = { @@ -204,15 +210,17 @@ def _translate_data( }, } - value_type = sdk_metric.instrument.value_type + value_type = sdk_metric_record.instrument.value_type sum_class = type_class[value_type]["sum"]["class"] gauge_class = type_class[value_type]["gauge"]["class"] data_point_class = type_class[value_type]["data_point_class"] - if isinstance(sdk_metric.instrument, Counter): + if isinstance(sdk_metric_record.instrument, Counter): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA ), @@ -220,9 +228,11 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, UpDownCounter): + elif isinstance(sdk_metric_record.instrument, UpDownCounter): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA ), @@ -230,13 +240,15 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, (ValueRecorder)): + elif isinstance(sdk_metric_record.instrument, (ValueRecorder)): logger.warning("Skipping exporting of ValueRecorder metric") continue - elif isinstance(sdk_metric.instrument, SumObserver): + elif isinstance(sdk_metric_record.instrument, SumObserver): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE ), @@ -244,9 +256,11 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, UpDownSumObserver): + elif isinstance(sdk_metric_record.instrument, UpDownSumObserver): otlp_metric_data = sum_class( - data_points=_get_data_points(sdk_metric, data_point_class), + data_points=_get_data_points( + sdk_metric_record, data_point_class + ), aggregation_temporality=( AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE ), @@ -254,20 +268,24 @@ def _translate_data( ) argument = type_class[value_type]["sum"]["argument"] - elif isinstance(sdk_metric.instrument, (ValueObserver)): + elif isinstance(sdk_metric_record.instrument, (ValueObserver)): otlp_metric_data = gauge_class( - data_points=_get_data_points(sdk_metric, data_point_class) + data_points=_get_data_points( + sdk_metric_record, data_point_class + ) ) argument = type_class[value_type]["gauge"]["argument"] sdk_resource_instrumentation_library_metrics[ - sdk_metric.resource + sdk_metric_record.resource ].metrics.append( OTLPMetric( **{ - "name": sdk_metric.instrument.name, - "description": sdk_metric.instrument.description, - "unit": sdk_metric.instrument.unit, + "name": sdk_metric_record.instrument.name, + "description": ( + sdk_metric_record.instrument.description + ), + "unit": sdk_metric_record.instrument.unit, argument: otlp_metric_data, } ) diff --git a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py index 530f9a430ae..3034fcdf651 100644 --- a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py +++ b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_metric_exporter.py @@ -52,14 +52,14 @@ def setUp(self): self.counter_metric_record = MetricRecord( Counter( - "a", - "b", "c", + "d", + "e", int, MeterProvider(resource=resource,).get_meter(__name__), - ("d",), + ("f",), ), - OrderedDict([("e", "f")]), + [("g", "h")], SumAggregator(), resource, ) @@ -97,7 +97,9 @@ def test_translate_metrics(self, mock_time_ns): mock_time_ns.configure_mock(**{"return_value": 1}) - self.counter_metric_record.instrument.add(1, OrderedDict([("a", "b")])) + self.counter_metric_record.aggregator.checkpoint = 1 + self.counter_metric_record.aggregator.initial_checkpoint_timestamp = 1 + self.counter_metric_record.aggregator.last_update_timestamp = 1 expected = ExportMetricsServiceRequest( resource_metrics=[ @@ -114,19 +116,20 @@ def test_translate_metrics(self, mock_time_ns): InstrumentationLibraryMetrics( metrics=[ OTLPMetric( - name="a", - description="b", - unit="c", + name="c", + description="d", + unit="e", int_sum=IntSum( data_points=[ IntDataPoint( labels=[ StringKeyValue( - key="a", value="b" + key="g", value="h" ) ], value=1, time_unix_nano=1, + start_time_unix_nano=1, ) ], aggregation_temporality=( diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 34a622bb995..2c1c3d971d0 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -14,6 +14,8 @@ ([#1282](https://github.com/open-telemetry/opentelemetry-python/pull/1282)) - Span.is_recording() returns false after span has ended ([#1289](https://github.com/open-telemetry/opentelemetry-python/pull/1289)) +- Set initial checkpoint timestamp in aggregators + ([#1237](https://github.com/open-telemetry/opentelemetry-python/pull/1237)) ## Version 0.14b0 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 7d3ad52df0e..84ab518a47f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -33,7 +33,8 @@ class Aggregator(abc.ABC): def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = 0 - self.last_checkpoint_timestamp = 0 + self.initial_checkpoint_timestamp = 0 + self.checkpointed = True if config is not None: self.config = config else: @@ -42,12 +43,15 @@ def __init__(self, config=None): @abc.abstractmethod def update(self, value): """Updates the current with the new value.""" + if self.checkpointed: + self.initial_checkpoint_timestamp = time_ns() + self.checkpointed = False self.last_update_timestamp = time_ns() @abc.abstractmethod def take_checkpoint(self): """Stores a snapshot of the current value.""" - self.last_checkpoint_timestamp = time_ns() + self.checkpointed = True @abc.abstractmethod def merge(self, other): @@ -55,8 +59,9 @@ def merge(self, other): self.last_update_timestamp = max( self.last_update_timestamp, other.last_update_timestamp ) - self.last_checkpoint_timestamp = max( - self.last_checkpoint_timestamp, other.last_checkpoint_timestamp + self.initial_checkpoint_timestamp = max( + self.initial_checkpoint_timestamp, + other.initial_checkpoint_timestamp, ) def _verify_type(self, other): diff --git a/tests/util/src/opentelemetry/test/controller.py b/tests/util/src/opentelemetry/test/controller.py new file mode 100644 index 00000000000..754d7bf9792 --- /dev/null +++ b/tests/util/src/opentelemetry/test/controller.py @@ -0,0 +1,62 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import sleep + +from opentelemetry.context import attach, detach, set_value +from opentelemetry.metrics import Meter +from opentelemetry.sdk.metrics.export import MetricsExporter + + +class DebugController: + """A debug controller, used to replace Push controller when debugging + + Push controller uses a thread which makes it hard to use the IPython + debugger. This controller does not use a thread, but relies on the user + manually calling its ``run`` method to start the controller. + + Args: + meter: The meter used to collect metrics. + exporter: The exporter used to export metrics. + interval: The collect/export interval in seconds. + """ + + daemon = True + + def __init__( + self, meter: Meter, exporter: MetricsExporter, interval: float + ): + super().__init__() + self.meter = meter + self.exporter = exporter + self.interval = interval + + def run(self): + while True: + self.tick() + sleep(self.interval) + + def shutdown(self): + # Run one more collection pass to flush metrics batched in the meter + self.tick() + + def tick(self): + # Collect all of the meter's metrics to be exported + self.meter.collect() + # Export the collected metrics + token = attach(set_value("suppress_instrumentation", True)) + self.exporter.export(self.meter.processor.checkpoint_set()) + detach(token) + # Perform post-exporting logic + self.meter.processor.finished_collection()