From 43190d369f1e63e42dd475bf85dc65fa09336a34 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 13 Mar 2026 17:25:47 +0900 Subject: [PATCH 01/11] feat(sdk): implement exporter metrics --- .../otlp/proto/common/_exporter_metrics.py | 121 ++++++++++++++ .../otlp/proto/grpc/_log_exporter/__init__.py | 12 ++ .../exporter/otlp/proto/grpc/exporter.py | 42 +++++ .../proto/grpc/metric_exporter/__init__.py | 24 ++- .../proto/grpc/trace_exporter/__init__.py | 12 ++ .../tests/logs/test_otlp_logs_exporter.py | 11 ++ .../tests/test_otlp_exporter_mixin.py | 147 +++++++++++++++++- .../tests/test_otlp_metrics_exporter.py | 63 ++++++++ .../tests/test_otlp_trace_exporter.py | 8 + .../otlp/proto/http/_log_exporter/__init__.py | 37 +++++ .../proto/http/metric_exporter/__init__.py | 52 +++++++ .../proto/http/trace_exporter/__init__.py | 37 +++++ .../metrics/test_otlp_metrics_exporter.py | 142 ++++++++++++++++- .../tests/test_proto_log_exporter.py | 114 +++++++++++++- .../tests/test_proto_span_exporter.py | 114 +++++++++++++- 15 files changed, 927 insertions(+), 9 deletions(-) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py new file mode 100644 index 00000000000..c3c605f31c8 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py @@ -0,0 +1,121 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections import Counter +from time import perf_counter +from typing import TYPE_CHECKING, Callable + +from opentelemetry.metrics import MeterProvider, get_meter_provider +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OTEL_COMPONENT_NAME, + OTEL_COMPONENT_TYPE, +) +from opentelemetry.semconv._incubating.metrics.otel_metrics import ( + create_otel_sdk_exporter_log_exported, + create_otel_sdk_exporter_log_inflight, + create_otel_sdk_exporter_metric_data_point_exported, + create_otel_sdk_exporter_metric_data_point_inflight, + create_otel_sdk_exporter_operation_duration, + create_otel_sdk_exporter_span_exported, + create_otel_sdk_exporter_span_inflight, +) +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE +from opentelemetry.semconv.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) + +if TYPE_CHECKING: + from typing import Literal + from urllib.parse import ParseResult as UrlParseResult + + from opentelemetry.util.types import Attributes, AttributeValue + +_component_counter = Counter() + + +class ExporterMetrics: + def __init__( + self, + component_type: str, + signal: Literal["span", "log", "metric_data_point"], + endpoint: UrlParseResult, + meter_provider: MeterProvider | None, + ) -> None: + if signal == "span": + create_exported = create_otel_sdk_exporter_span_exported + create_inflight = create_otel_sdk_exporter_span_inflight + elif signal == "log": + create_exported = create_otel_sdk_exporter_log_exported + create_inflight = create_otel_sdk_exporter_log_inflight + else: + create_exported = ( + create_otel_sdk_exporter_metric_data_point_exported + ) + create_inflight = ( + create_otel_sdk_exporter_metric_data_point_inflight + ) + + port = endpoint.port + if port is None: + if endpoint.scheme == "https": + port = 443 + elif endpoint.scheme == "http": + port = 80 + + count = _component_counter[component_type] + _component_counter[component_type] = count + 1 + self._standard_attrs: dict[str, AttributeValue] = { + OTEL_COMPONENT_TYPE: component_type, + OTEL_COMPONENT_NAME: f"{component_type}/{count}", + } + if endpoint.hostname: + self._standard_attrs[SERVER_ADDRESS] = endpoint.hostname + if port is not None: + self._standard_attrs[SERVER_PORT] = port + + meter_provider = meter_provider or get_meter_provider() + meter = meter_provider.get_meter("opentelemetry-sdk") + self._inflight = create_inflight(meter) + self._exported = create_exported(meter) + self._duration = create_otel_sdk_exporter_operation_duration(meter) + + def start_export( + self, num_items: int + ) -> Callable[[Exception | None, Attributes], None]: + start_time = perf_counter() + self._inflight.add(num_items, self._standard_attrs) + + def finish_export( + error: Exception | None, + error_attrs: Attributes, + ): + end_time = perf_counter() + self._inflight.add(-num_items, self._standard_attrs) + exported_attrs = ( + {**self._standard_attrs, ERROR_TYPE: type(error).__qualname__} + if error + else self._standard_attrs + ) + self._exported.add(num_items, exported_attrs) + duration_attrs = ( + {**exported_attrs, **error_attrs} + if error_attrs + else exported_attrs + ) + self._duration.record(end_time - start_time, duration_attrs) + + return finish_export diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 63d8ac9cfb0..a86af8dfbca 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -22,6 +22,7 @@ _get_credentials, environ_to_compression, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, ) @@ -44,6 +45,9 @@ OTEL_EXPORTER_OTLP_LOGS_INSECURE, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) class OTLPLogExporter( @@ -66,6 +70,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE) if insecure is None and insecure_logs is not None: @@ -105,6 +111,9 @@ def __init__( stub=LogsServiceStub, result=LogRecordExportResult, channel_options=channel_options, + component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER.value, + signal="log", + meter_provider=meter_provider, ) def _translate_data( @@ -112,6 +121,9 @@ def _translate_data( ) -> ExportLogsServiceRequest: return encode_logs(data) + def _count_data(self, data: Sequence[ReadableLogRecord]): + return len(data) + def export( # type: ignore [reportIncompatibleMethodOverride] self, batch: Sequence[ReadableLogRecord], diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 89c2608c30a..6cc8098db64 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -56,12 +56,16 @@ secure_channel, ssl_channel_credentials, ) +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, ) from opentelemetry.exporter.otlp.proto.grpc import ( _OTLP_GRPC_CHANNEL_OPTIONS, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, ) @@ -104,6 +108,9 @@ from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_RESPONSE_STATUS_CODE, +) from opentelemetry.util._importlib_metadata import entry_points from opentelemetry.util.re import parse_env_headers @@ -299,6 +306,10 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + *, + component_type: str, + signal: Literal["span", "log", "metric_data_point"], + meter_provider: Optional[MeterProvider], ): super().__init__() self._result = result @@ -372,6 +383,16 @@ def __init__( OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) + self._component_type = component_type + self._signal: Literal["span", "log", "metric_data_point"] = signal + self._parsed_url = parsed_url + self._metrics = ExporterMetrics( + component_type, + signal, + parsed_url, + meter_provider, + ) + self._initialize_channel_and_stub() def _initialize_channel_and_stub(self): @@ -404,6 +425,13 @@ def _translate_data( ) -> ExportServiceRequestT: pass + @abstractmethod + def _count_data( + self, + data: SDKDataT, + ) -> int: + pass + def _export( self, data: SDKDataT, @@ -412,6 +440,8 @@ def _export( logger.warning("Exporter already shutdown, ignoring batch") return self._result.FAILURE # type: ignore [reportReturnType] + finish_export = self._metrics.start_export(self._count_data(data)) + # FIXME remove this check if the export type for traces # gets updated to a class that represents the proto # TracesData and use the code below instead. @@ -425,6 +455,7 @@ def _export( metadata=self._headers, timeout=deadline_sec - time(), ) + finish_export(None, None) return self._result.SUCCESS # type: ignore [reportReturnType] except RpcError as error: retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue] @@ -472,6 +503,9 @@ def _export( error.code(), # type: ignore [reportAttributeAccessIssue] exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue] ) + finish_export( + error, {RPC_RESPONSE_STATUS_CODE: error.code().name} + ) return self._result.FAILURE # type: ignore [reportReturnType] logger.warning( "Transient error %s encountered while exporting %s to %s, retrying in %.2fs.", @@ -510,3 +544,11 @@ def _exporting(self) -> str: warning messages. """ pass + + def _set_meter_provider(self, meter_provider: MeterProvider) -> None: + self._metrics = ExporterMetrics( + self._component_type, + self._signal, + self._parsed_url, + meter_provider, + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index af77f6d1239..b8d000c6b17 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -16,7 +16,7 @@ from dataclasses import replace from logging import getLogger from os import environ -from typing import Iterable, List, Tuple, Union +from typing import Iterable, List, Optional, Tuple, Union from typing import Sequence as TypingSequence from grpc import ChannelCredentials, Compression @@ -32,6 +32,7 @@ environ_to_compression, get_resource_data, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( ExportMetricsServiceRequest, ) @@ -72,6 +73,9 @@ from opentelemetry.sdk.metrics.export import ( # noqa: F401 Histogram as HistogramType, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) _logger = getLogger(__name__) @@ -109,6 +113,8 @@ def __init__( preferred_aggregation: dict[type, Aggregation] | None = None, max_export_batch_size: int | None = None, channel_options: Tuple[Tuple[str, str]] | None = None, + *, + meter_provider: Optional[MeterProvider] = None, ): insecure_metrics = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE) if insecure is None and insecure_metrics is not None: @@ -153,6 +159,9 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, + component_type=OtelComponentTypeValues.OTLP_GRPC_METRIC_EXPORTER.value, + signal="metric_data_point", + meter_provider=meter_provider, ) self._max_export_batch_size: int | None = max_export_batch_size @@ -162,6 +171,16 @@ def _translate_data( # type: ignore [reportIncompatibleMethodOverride] ) -> ExportMetricsServiceRequest: return encode_metrics(data) + def _count_data(self, data: MetricsData): + num_items = 0 + + for resource_metrics in data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + num_items += len(metric.data.data_points) + + return num_items + def export( self, metrics_data: MetricsData, @@ -268,6 +287,9 @@ def _split_metrics_data( def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) + def set_meter_provider(self, meter_provider: MeterProvider): + return self._set_meter_provider(meter_provider) + @property def _exporting(self) -> str: return "metrics" diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index 19b189e5b9c..fdd5c9309f6 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -28,6 +28,7 @@ environ_to_compression, get_resource_data, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( ExportTraceServiceRequest, ) @@ -58,6 +59,9 @@ ) from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) logger = logging.getLogger(__name__) @@ -95,6 +99,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): insecure_spans = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE) if insecure is None and insecure_spans is not None: @@ -135,6 +141,9 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, + component_type=OtelComponentTypeValues.OTLP_GRPC_SPAN_EXPORTER.value, + signal="span", + meter_provider=meter_provider, ) def _translate_data( @@ -142,6 +151,9 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) + def _count_data(self, data: Sequence[ReadableSpan]): + return len(data) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index 94e8cc944c3..a0ee318b67e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -402,6 +402,9 @@ def test_translate_log_data(self): expected, self.exporter._translate_data([self.log_data_1]) ) + def test_count_log_data(self): + self.assertEqual(1, self.exporter._count_data([self.log_data_1])) + def test_translate_multiple_logs(self): expected = ExportLogsServiceRequest( resource_logs=[ @@ -539,3 +542,11 @@ def test_translate_multiple_logs(self): [self.log_data_1, self.log_data_2, self.log_data_3] ), ) + + def test_count_multiple_logs(self): + self.assertEqual( + 3, + self.exporter._count_data( + [self.log_data_1, self.log_data_2, self.log_data_3] + ), + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index de27d0fe792..62ce2e37a7e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -55,6 +55,8 @@ _OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER, OTEL_EXPORTER_OTLP_COMPRESSION, ) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.trace import ReadableSpan, _Span from opentelemetry.sdk.trace.export import ( SpanExporter, @@ -78,13 +80,23 @@ class OTLPSpanExporterForTesting( ], ): def __init__(self, **kwargs): - super().__init__(TraceServiceStub, SpanExportResult, **kwargs) + super().__init__( + TraceServiceStub, + SpanExportResult, + component_type="test_span_exporter", + signal="span", + meter_provider=kwargs.pop("meter_provider", None), + **kwargs, + ) def _translate_data( self, data: Sequence[ReadableSpan] ) -> ExportTraceServiceRequest: return encode_spans(data) + def _count_data(self, data: Sequence[ReadableSpan]) -> int: + return len(data) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) @@ -161,7 +173,14 @@ def setUp(self): self.server.add_insecure_port("127.0.0.1:4317") self.server.start() - self.exporter = OTLPSpanExporterForTesting(insecure=True) + + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + self.exporter = OTLPSpanExporterForTesting( + insecure=True, meter_provider=self.meter_provider + ) self.span = _Span( "a", context=Mock( @@ -372,6 +391,26 @@ def test_shutdown(self): self.assertEqual( self.exporter.export([self.span]), SpanExportResult.SUCCESS ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.exporter.shutdown() with self.assertLogs(level=WARNING) as warning: self.assertEqual( @@ -446,7 +485,9 @@ def test_retry_info_is_respected(self): mock_trace_service, self.server, ) - exporter = OTLPSpanExporterForTesting(insecure=True, timeout=10) + exporter = OTLPSpanExporterForTesting( + insecure=True, timeout=10, meter_provider=self.meter_provider + ) before = time.time() self.assertEqual( exporter.export([self.span]), @@ -457,6 +498,51 @@ def test_retry_info_is_respected(self): # 1 second plus wiggle room so the test passes consistently. self.assertAlmostEqual(after - before, 1, 1) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertEqual( + metrics[0] + .data.data_points[0] + .attributes["rpc.response.status_code"], + "UNAVAILABLE", + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @unittest.skipIf( system() == "Windows", "For gRPC + windows there's some added delay in the RPCs which breaks the assertion over amount of time passed.", @@ -548,6 +634,51 @@ def test_permanent_failure(self): "Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS", ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertEqual( + metrics[0] + .data.data_points[0] + .attributes["rpc.response.status_code"], + "ALREADY_EXISTS", + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].data.data_points[0].attributes["error.type"], + "_InactiveRpcError", + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "rpc.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + def test_unavailable_reconnects(self): """Test that the exporter reconnects on UNAVAILABLE error""" add_TraceServiceServicer_to_server( @@ -571,3 +702,13 @@ def test_unavailable_reconnects(self): # must be from the reconnection logic. self.assertTrue(mock_insecure_channel.called) # Verify that reconnection enabled flag is set + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "test_span_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith("test_span_exporter/") + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4317) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 16fd666dd1a..6799178b658 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -590,6 +590,69 @@ def test_split_metrics_data_many_resources_scopes_metrics(self): split_metrics_data, ) + def test_count_metrics_data(self): + # GIVEN + metrics_data = MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + # WHEN + # pylint: disable=protected-access + count = OTLPMetricExporter(max_export_batch_size=2)._count_data( + metrics_data, + ) + # THEN + self.assertEqual(count, 4) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index cbe6298df77..1f44a03470d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -481,6 +481,9 @@ def test_translate_spans(self): # pylint: disable=protected-access self.assertEqual(expected, self.exporter._translate_data([self.span])) + def test_count_spans(self): + self.assertEqual(1, self.exporter._count_data([self.span])) + def test_translate_spans_multi(self): expected = ExportTraceServiceRequest( resource_spans=[ @@ -660,6 +663,11 @@ def test_translate_spans_multi(self): self.exporter._translate_data([self.span, self.span2, self.span3]), ) + def test_count_spans_multi(self): + self.assertEqual( + 3, self.exporter._count_data([self.span, self.span2, self.span3]) + ) + def _check_translated_status( self, translated: ExportTraceServiceRequest, diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 7aea76be8d2..770eeeba037 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -21,10 +21,14 @@ from os import environ from time import time from typing import Dict, Optional, Sequence +from urllib.parse import urlparse import requests from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( _OTLP_HTTP_HEADERS, @@ -34,6 +38,7 @@ _is_retryable, _load_session_from_envvar, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.sdk._logs import ReadableLogRecord from opentelemetry.sdk._logs.export import ( LogRecordExporter, @@ -57,6 +62,12 @@ OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, +) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -82,6 +93,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): self._shutdown_is_occuring = threading.Event() self._endpoint = endpoint or environ.get( @@ -139,6 +152,13 @@ def __init__( ) self._shutdown = False + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER.value, + "log", + urlparse(self._endpoint), + meter_provider, + ) + def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None ): @@ -183,17 +203,22 @@ def export( _logger.warning("Exporter already shutdown, ignoring batch") return LogRecordExportResult.FAILURE + finish_export = self._metrics.start_export(len(batch)) + serialized_data = encode_logs(batch).SerializeToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None try: resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: + finish_export(None, None) return LogRecordExportResult.SUCCESS except requests.exceptions.RequestException as error: reason = error + export_error = error retryable = isinstance(error, ConnectionError) status_code = None else: @@ -207,6 +232,12 @@ def export( status_code, reason, ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return LogRecordExportResult.FAILURE if ( @@ -218,6 +249,12 @@ def export( "Failed to export logs batch due to timeout, " "max retries or shutdown." ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return LogRecordExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 7e08f624375..3d8250f0485 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -29,11 +29,15 @@ Optional, Sequence, ) +from urllib.parse import urlparse import requests from requests.exceptions import ConnectionError from typing_extensions import deprecated +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, ) @@ -51,6 +55,7 @@ _is_retryable, _load_session_from_envvar, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 ExportMetricsServiceRequest, ) @@ -96,6 +101,12 @@ Histogram as HistogramType, ) from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, +) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -122,6 +133,8 @@ def __init__( preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, + *, + meter_provider: Optional[MeterProvider] = None, ): self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( @@ -182,6 +195,13 @@ def __init__( ) self._shutdown = False + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER.value, + "metric_data_point", + urlparse(self._endpoint), + meter_provider, + ) + def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None ): @@ -228,17 +248,29 @@ def export( if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return MetricExportResult.FAILURE + + num_items = 0 + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + num_items += len(metric.data.data_points) + + finish_export = self._metrics.start_export(num_items) + serialized_data = encode_metrics(metrics_data).SerializeToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None try: resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: + finish_export(None, None) return MetricExportResult.SUCCESS except requests.exceptions.RequestException as error: reason = error + export_error = error retryable = isinstance(error, ConnectionError) status_code = None else: @@ -252,6 +284,12 @@ def export( status_code, reason, ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return MetricExportResult.FAILURE if ( retry_num + 1 == _MAX_RETRYS @@ -262,6 +300,12 @@ def export( "Failed to export metrics batch due to timeout, " "max retries or shutdown." ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return MetricExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", @@ -290,6 +334,14 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True + def set_meter_provider(self, meter_provider: MeterProvider) -> None: + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER.value, + "metric_data_point", + urlparse(self._endpoint), + meter_provider, + ) + @deprecated( "Use one of the encoders from opentelemetry-exporter-otlp-proto-common instead. Deprecated since version 1.18.0.", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index d02f94adf05..4f9f960b47f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -21,10 +21,14 @@ from os import environ from time import time from typing import Dict, Optional, Sequence +from urllib.parse import urlparse import requests from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( + ExporterMetrics, +) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) @@ -36,6 +40,7 @@ _is_retryable, _load_session_from_envvar, ) +from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER, OTEL_EXPORTER_OTLP_CERTIFICATE, @@ -55,6 +60,12 @@ ) from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, +) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -78,6 +89,8 @@ def __init__( timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, + *, + meter_provider: Optional[MeterProvider] = None, ): self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( @@ -134,6 +147,13 @@ def __init__( ) self._shutdown = False + self._metrics = ExporterMetrics( + OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER.value, + "span", + urlparse(self._endpoint), + meter_provider, + ) + def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None ): @@ -176,17 +196,22 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE + finish_export = self._metrics.start_export(len(spans)) + serialized_data = encode_spans(spans).SerializePartialToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None try: resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: + finish_export(None, None) return SpanExportResult.SUCCESS except requests.exceptions.RequestException as error: reason = error + export_error = error retryable = isinstance(error, ConnectionError) status_code = None else: @@ -200,6 +225,12 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: status_code, reason, ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return SpanExportResult.FAILURE if ( @@ -211,6 +242,12 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: "Failed to export span batch due to timeout, " "max retries or shutdown." ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + finish_export(export_error, error_attrs) return SpanExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting span batch, retrying in %.2fs.", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 2dbbadccb9e..727bfa34e31 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -58,6 +58,7 @@ from opentelemetry.sdk.metrics import ( Counter, Histogram, + MeterProvider, ObservableCounter, ObservableGauge, ObservableUpDownCounter, @@ -65,6 +66,7 @@ ) from opentelemetry.sdk.metrics.export import ( AggregationTemporality, + InMemoryMetricReader, MetricExportResult, MetricsData, ResourceMetrics, @@ -92,6 +94,10 @@ # pylint: disable=protected-access class TestOTLPMetricExporter(TestCase): def setUp(self): + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) self.metrics = { "sum_int": MetricsData( resource_metrics=[ @@ -319,12 +325,37 @@ def test_success(self, mock_post): mock_post.return_value = resp exporter = OTLPMetricExporter() + exporter.set_meter_provider(self.meter_provider) self.assertEqual( exporter.export(self.metrics["sum_int"]), MetricExportResult.SUCCESS, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.metric_data_point.exported" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].name, "otel.sdk.exporter.metric_data_point.inflight" + ) + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + @patch.object(Session, "post") def test_failure(self, mock_post): resp = Response() @@ -332,12 +363,60 @@ def test_failure(self, mock_post): mock_post.return_value = resp exporter = OTLPMetricExporter() + exporter.set_meter_provider(self.meter_provider) self.assertEqual( exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.metric_data_point.exported" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[1].name, "otel.sdk.exporter.metric_data_point.inflight" + ) + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2] + .data.data_points[0] + .attributes["http.response.status_code"], + 401, + ) + @patch.object(Session, "post") def test_serialization(self, mock_post): resp = Response() @@ -534,7 +613,9 @@ def test_preferred_aggregation_override(self): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPMetricExporter(timeout=1.5) + exporter = OTLPMetricExporter( + timeout=1.5, meter_provider=self.meter_provider + ) resp = Response() resp.status_code = 503 @@ -557,6 +638,53 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.metric_data_point.exported" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[1].name, "otel.sdk.exporter.metric_data_point.inflight" + ) + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2] + .data.data_points[0] + .attributes["http.response.status_code"], + 503, + ) + @patch.object(Session, "post") def test_export_no_collector_available_retryable(self, mock_post): exporter = OTLPMetricExporter(timeout=1.5) @@ -635,3 +763,15 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "otlp_http_metric_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith( + "otlp_http_metric_exporter/" + ) + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4318) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index c86ac1f6ba1..7981b0bc821 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -59,6 +59,8 @@ OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT, ) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.test.mock_test_classes import IterEntryPoint @@ -78,6 +80,12 @@ class TestOTLPHTTPLogExporter(unittest.TestCase): + def setUp(self): + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + def test_constructor_default(self): exporter = OTLPLogExporter() @@ -461,7 +469,9 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPLogExporter(timeout=1.5) + exporter = OTLPLogExporter( + timeout=1.5, meter_provider=self.meter_provider + ) resp = Response() resp.status_code = 503 @@ -484,6 +494,49 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual(metrics[0].name, "otel.sdk.exporter.log.exported") + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.log.inflight") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2] + .data.data_points[0] + .attributes["http.response.status_code"], + 503, + ) + @patch.object(Session, "post") def test_export_no_collector_available_retryable(self, mock_post): exporter = OTLPLogExporter(timeout=1.5) @@ -504,7 +557,9 @@ def test_export_no_collector_available_retryable(self, mock_post): @patch.object(Session, "post") def test_export_no_collector_available(self, mock_post): - exporter = OTLPLogExporter(timeout=1.5) + exporter = OTLPLogExporter( + timeout=1.5, meter_provider=self.meter_provider + ) mock_post.side_effect = requests.exceptions.RequestException() with self.assertLogs(level=WARNING) as warning: @@ -518,6 +573,49 @@ def test_export_no_collector_available(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual(metrics[0].name, "otel.sdk.exporter.log.exported") + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.log.inflight") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual( + metrics[2].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertEqual( + metrics[2].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): resp = Response() @@ -562,3 +660,15 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "otlp_http_log_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith( + "otlp_http_log_exporter/" + ) + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4318) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 5f61344bbf1..0df471aa693 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -49,6 +49,8 @@ OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, ) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.trace import _Span from opentelemetry.sdk.trace.export import SpanExportResult from opentelemetry.test.mock_test_classes import IterEntryPoint @@ -73,6 +75,12 @@ # pylint: disable=protected-access class TestOTLPSpanExporter(unittest.TestCase): + def setUp(self): + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + def test_constructor_default(self): exporter = OTLPSpanExporter() @@ -281,7 +289,9 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPSpanExporter(timeout=1.5) + exporter = OTLPSpanExporter( + timeout=1.5, meter_provider=self.meter_provider + ) resp = Response() resp.status_code = 503 @@ -304,6 +314,49 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0] + .data.data_points[0] + .attributes["http.response.status_code"], + 503, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[1].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @patch.object(Session, "post") def test_export_no_collector_available_retryable(self, mock_post): exporter = OTLPSpanExporter(timeout=1.5) @@ -324,7 +377,9 @@ def test_export_no_collector_available_retryable(self, mock_post): @patch.object(Session, "post") def test_export_no_collector_available(self, mock_post): - exporter = OTLPSpanExporter(timeout=1.5) + exporter = OTLPSpanExporter( + timeout=1.5, meter_provider=self.meter_provider + ) mock_post.side_effect = requests.exceptions.RequestException() with self.assertLogs(level=WARNING) as warning: @@ -338,6 +393,49 @@ def test_export_no_collector_available(self, mock_post): warning.records[0].message, ) + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + self.assertEqual(scope_metrics.scope.name, "opentelemetry-sdk") + metrics = sorted(scope_metrics.metrics, key=lambda m: m.name) + self.assertEqual(len(metrics), 3) + self.assertEqual( + metrics[0].name, "otel.sdk.exporter.operation.duration" + ) + self.assert_standard_metric_attrs( + metrics[0].data.data_points[0].attributes + ) + self.assertEqual( + metrics[0].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[0].data.data_points[0].attributes, + ) + self.assertEqual(metrics[1].name, "otel.sdk.exporter.span.exported") + self.assert_standard_metric_attrs( + metrics[1].data.data_points[0].attributes + ) + self.assertEqual( + metrics[1].data.data_points[0].attributes["error.type"], + "RequestException", + ) + self.assertNotIn( + "http.response.status_code", + metrics[1].data.data_points[0].attributes, + ) + self.assertEqual(metrics[2].name, "otel.sdk.exporter.span.inflight") + self.assert_standard_metric_attrs( + metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "error.type", metrics[2].data.data_points[0].attributes + ) + self.assertNotIn( + "http.response.status_code", + metrics[2].data.data_points[0].attributes, + ) + @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): resp = Response() @@ -380,3 +478,15 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + + def assert_standard_metric_attrs(self, attributes): + self.assertEqual( + attributes["otel.component.type"], "otlp_http_span_exporter" + ) + self.assertTrue( + attributes["otel.component.name"].startswith( + "otlp_http_span_exporter/" + ) + ) + self.assertEqual(attributes["server.address"], "localhost") + self.assertEqual(attributes["server.port"], 4318) From 83b8af1306ebd53384959b86429d176e2ad27fc2 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 13 Mar 2026 17:27:24 +0900 Subject: [PATCH 02/11] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfed95eb4aa..7852e054d00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4935](https://github.com/open-telemetry/opentelemetry-python/pull/4935)) - `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0 ([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965)) +- `opentelemetry-sdk`: implement exporter metrics + ([#4976](https://github.com/open-telemetry/opentelemetry-python/pull/4976)) - `opentelemetry-exporter-prometheus`: Fix metric name prefix ([#4895](https://github.com/open-telemetry/opentelemetry-python/pull/4895)) From d867d4806df70628f1ee9be5d7edfc0f09af2450 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 13 Mar 2026 17:31:35 +0900 Subject: [PATCH 03/11] Lint --- .../tests/logs/test_otlp_logs_exporter.py | 2 ++ .../tests/test_otlp_exporter_mixin.py | 1 + .../tests/test_otlp_trace_exporter.py | 5 ++++- .../tests/metrics/test_otlp_metrics_exporter.py | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index a0ee318b67e..64c6eb57714 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -403,6 +403,7 @@ def test_translate_log_data(self): ) def test_count_log_data(self): + # pylint: disable=protected-access self.assertEqual(1, self.exporter._count_data([self.log_data_1])) def test_translate_multiple_logs(self): @@ -546,6 +547,7 @@ def test_translate_multiple_logs(self): def test_count_multiple_logs(self): self.assertEqual( 3, + # pylint: disable=protected-access self.exporter._count_data( [self.log_data_1, self.log_data_2, self.log_data_3] ), diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 62ce2e37a7e..39a85b6ea69 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -166,6 +166,7 @@ def join(self, timeout: Optional[float] = None) -> Any: return self._return +# pylint: disable=too-many-public-methods class TestOTLPExporterMixin(TestCase): def setUp(self): self.server = server(ThreadPoolExecutor(max_workers=10)) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index 1f44a03470d..c05fc0565d3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -482,6 +482,7 @@ def test_translate_spans(self): self.assertEqual(expected, self.exporter._translate_data([self.span])) def test_count_spans(self): + # pylint: disable=protected-access self.assertEqual(1, self.exporter._count_data([self.span])) def test_translate_spans_multi(self): @@ -665,7 +666,9 @@ def test_translate_spans_multi(self): def test_count_spans_multi(self): self.assertEqual( - 3, self.exporter._count_data([self.span, self.span2, self.span3]) + # pylint: disable=protected-access + 3, + self.exporter._count_data([self.span, self.span2, self.span3]), ) def _check_translated_status( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 727bfa34e31..25839305bbc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -91,7 +91,7 @@ OS_ENV_TIMEOUT = "30" -# pylint: disable=protected-access +# pylint: disable=protected-access,too-many-public-methods class TestOTLPMetricExporter(TestCase): def setUp(self): self.metric_reader = InMemoryMetricReader() From 0fd15afc47f555f737fb367b5891804c262f2dea Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 13 Mar 2026 17:34:46 +0900 Subject: [PATCH 04/11] All optional --- .../opentelemetry/exporter/otlp/proto/grpc/exporter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 6cc8098db64..1010a636218 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -307,8 +307,8 @@ def __init__( compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, *, - component_type: str, - signal: Literal["span", "log", "metric_data_point"], + component_type: Optional[str] = None, + signal: Literal["span", "log", "metric_data_point"] = "span", meter_provider: Optional[MeterProvider], ): super().__init__() @@ -383,11 +383,11 @@ def __init__( OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) - self._component_type = component_type + self._component_type = component_type or type(self).__qualname__ self._signal: Literal["span", "log", "metric_data_point"] = signal self._parsed_url = parsed_url self._metrics = ExporterMetrics( - component_type, + self._component_type, signal, parsed_url, meter_provider, From 223b516a48b83559dfeb67803e6811ce5713e067 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 13 Mar 2026 17:36:20 +0900 Subject: [PATCH 05/11] Fix --- .../src/opentelemetry/exporter/otlp/proto/grpc/exporter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 1010a636218..5a3f690141b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -309,7 +309,7 @@ def __init__( *, component_type: Optional[str] = None, signal: Literal["span", "log", "metric_data_point"] = "span", - meter_provider: Optional[MeterProvider], + meter_provider: Optional[MeterProvider] = None, ): super().__init__() self._result = result From 9e80c6a436640c7613472ba64140a3bf096612a7 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 27 Mar 2026 12:13:19 +0900 Subject: [PATCH 06/11] Cleanup --- .../exporter/otlp/proto/common/_exporter_metrics.py | 10 ++++++---- .../otlp/proto/grpc/_log_exporter/__init__.py | 4 ++-- .../exporter/otlp/proto/grpc/exporter.py | 11 +++++++---- .../otlp/proto/grpc/metric_exporter/__init__.py | 4 ++-- .../otlp/proto/grpc/trace_exporter/__init__.py | 4 ++-- 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py index c3c605f31c8..7c3426cb25f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py @@ -22,6 +22,7 @@ from opentelemetry.semconv._incubating.attributes.otel_attributes import ( OTEL_COMPONENT_NAME, OTEL_COMPONENT_TYPE, + OtelComponentTypeValues, ) from opentelemetry.semconv._incubating.metrics.otel_metrics import ( create_otel_sdk_exporter_log_exported, @@ -50,15 +51,15 @@ class ExporterMetrics: def __init__( self, - component_type: str, - signal: Literal["span", "log", "metric_data_point"], + component_type: OtelComponentTypeValues | None, + signal: Literal["traces", "metrics", "logs"], endpoint: UrlParseResult, meter_provider: MeterProvider | None, ) -> None: - if signal == "span": + if signal == "traces": create_exported = create_otel_sdk_exporter_span_exported create_inflight = create_otel_sdk_exporter_span_inflight - elif signal == "log": + elif signal == "logs": create_exported = create_otel_sdk_exporter_log_exported create_inflight = create_otel_sdk_exporter_log_inflight else: @@ -76,6 +77,7 @@ def __init__( elif endpoint.scheme == "http": port = 80 + component_type = (component_type or OtelComponentTypeValues("unknown_otlp_exporter")).value count = _component_counter[component_type] _component_counter[component_type] = count + 1 self._standard_attrs: dict[str, AttributeValue] = { diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index a86af8dfbca..da40e9e9d58 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -111,8 +111,8 @@ def __init__( stub=LogsServiceStub, result=LogRecordExportResult, channel_options=channel_options, - component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER.value, - signal="log", + component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER, + signal="logs", meter_provider=meter_provider, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 5a3f690141b..6d5d421f2db 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -108,6 +108,9 @@ from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( RPC_RESPONSE_STATUS_CODE, ) @@ -307,8 +310,8 @@ def __init__( compression: Optional[Compression] = None, channel_options: Optional[Tuple[Tuple[str, str]]] = None, *, - component_type: Optional[str] = None, - signal: Literal["span", "log", "metric_data_point"] = "span", + component_type: Union[OtelComponentTypeValues, None] = None, + signal: Literal["traces", "metrics", "logs"] = "traces", meter_provider: Optional[MeterProvider] = None, ): super().__init__() @@ -383,8 +386,8 @@ def __init__( OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) - self._component_type = component_type or type(self).__qualname__ - self._signal: Literal["span", "log", "metric_data_point"] = signal + self._component_type = component_type + self._signal: Literal["traces", "metrics", "logs"] = signal self._parsed_url = parsed_url self._metrics = ExporterMetrics( self._component_type, diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index b8d000c6b17..1b2b1e5ae41 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -159,8 +159,8 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, - component_type=OtelComponentTypeValues.OTLP_GRPC_METRIC_EXPORTER.value, - signal="metric_data_point", + component_type=OtelComponentTypeValues.OTLP_GRPC_METRIC_EXPORTER, + signal="metrics", meter_provider=meter_provider, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index fdd5c9309f6..7d9dfd7625d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -141,8 +141,8 @@ def __init__( timeout=timeout or environ_timeout, compression=compression, channel_options=channel_options, - component_type=OtelComponentTypeValues.OTLP_GRPC_SPAN_EXPORTER.value, - signal="span", + component_type=OtelComponentTypeValues.OTLP_GRPC_SPAN_EXPORTER, + signal="traces", meter_provider=meter_provider, ) From 9de118ed0f3a5896cef86327540e47c75f0ccc2a Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 27 Mar 2026 12:18:31 +0900 Subject: [PATCH 07/11] Update HTTP exporters --- .../exporter/otlp/proto/http/_log_exporter/__init__.py | 4 ++-- .../exporter/otlp/proto/http/metric_exporter/__init__.py | 4 ++-- .../exporter/otlp/proto/http/trace_exporter/__init__.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 770eeeba037..de17153a37a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -153,8 +153,8 @@ def __init__( self._shutdown = False self._metrics = ExporterMetrics( - OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER.value, - "log", + OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER, + "logs", urlparse(self._endpoint), meter_provider, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 3d8250f0485..33621d5a232 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -196,8 +196,8 @@ def __init__( self._shutdown = False self._metrics = ExporterMetrics( - OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER.value, - "metric_data_point", + OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER, + "metrics", urlparse(self._endpoint), meter_provider, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 4f9f960b47f..ef01595c336 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -148,8 +148,8 @@ def __init__( self._shutdown = False self._metrics = ExporterMetrics( - OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER.value, - "span", + OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER, + "traces", urlparse(self._endpoint), meter_provider, ) From f6de4944345660ad8f3bd4d217a9839b456f581e Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 27 Mar 2026 12:20:13 +0900 Subject: [PATCH 08/11] Format --- .../exporter/otlp/proto/common/_exporter_metrics.py | 4 +++- .../tests/test_otlp_exporter_mixin.py | 13 +++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py index 7c3426cb25f..76d4ef2d74e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py @@ -77,7 +77,9 @@ def __init__( elif endpoint.scheme == "http": port = 80 - component_type = (component_type or OtelComponentTypeValues("unknown_otlp_exporter")).value + component_type = ( + component_type or OtelComponentTypeValues("unknown_otlp_exporter") + ).value count = _component_counter[component_type] _component_counter[component_type] = count + 1 self._standard_attrs: dict[str, AttributeValue] = { diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 39a85b6ea69..87bd72c5550 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -62,6 +62,9 @@ SpanExporter, SpanExportResult, ) +from opentelemetry.semconv._incubating.attributes.otel_attributes import ( + OtelComponentTypeValues, +) from opentelemetry.test.mock_test_classes import IterEntryPoint logger = getLogger(__name__) @@ -83,8 +86,8 @@ def __init__(self, **kwargs): super().__init__( TraceServiceStub, SpanExportResult, - component_type="test_span_exporter", - signal="span", + component_type=OtelComponentTypeValues.OTLP_GRPC_SPAN_EXPORTER, + signal="traces", meter_provider=kwargs.pop("meter_provider", None), **kwargs, ) @@ -706,10 +709,12 @@ def test_unavailable_reconnects(self): def assert_standard_metric_attrs(self, attributes): self.assertEqual( - attributes["otel.component.type"], "test_span_exporter" + attributes["otel.component.type"], "otlp_grpc_span_exporter" ) self.assertTrue( - attributes["otel.component.name"].startswith("test_span_exporter/") + attributes["otel.component.name"].startswith( + "otlp_grpc_span_exporter/" + ) ) self.assertEqual(attributes["server.address"], "localhost") self.assertEqual(attributes["server.port"], 4317) From 4f521609fbdd8f024c64d0e9558753fabc620727 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 2 Apr 2026 14:13:40 +0900 Subject: [PATCH 09/11] context manager --- .../otlp/proto/common/_exporter_metrics.py | 28 ++-- .../exporter/otlp/proto/grpc/exporter.py | 144 +++++++++--------- .../proto/http/metric_exporter/__init__.py | 124 +++++++-------- .../metrics/test_otlp_metrics_exporter.py | 1 + 4 files changed, 155 insertions(+), 142 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py index 76d4ef2d74e..e847dfd9ba5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py @@ -15,8 +15,10 @@ from __future__ import annotations from collections import Counter +from contextlib import contextmanager +from dataclasses import dataclass from time import perf_counter -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Iterator from opentelemetry.metrics import MeterProvider, get_meter_provider from opentelemetry.semconv._incubating.attributes.otel_attributes import ( @@ -48,6 +50,12 @@ _component_counter = Counter() +@dataclass +class ExportResult: + error: Exception | None + error_attrs: Attributes + + class ExporterMetrics: def __init__( self, @@ -97,16 +105,18 @@ def __init__( self._exported = create_exported(meter) self._duration = create_otel_sdk_exporter_operation_duration(meter) - def start_export( - self, num_items: int - ) -> Callable[[Exception | None, Attributes], None]: + @contextmanager + def export_operation(self, num_items: int) -> Iterator[ExportResult]: start_time = perf_counter() self._inflight.add(num_items, self._standard_attrs) - def finish_export( - error: Exception | None, - error_attrs: Attributes, - ): + result = ExportResult() + try: + yield result + finally: + error = result.error + error_attrs = result.error_attrs + end_time = perf_counter() self._inflight.add(-num_items, self._standard_attrs) exported_attrs = ( @@ -121,5 +131,3 @@ def finish_export( else exported_attrs ) self._duration.record(end_time - start_time, duration_attrs) - - return finish_export diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index ec775347542..3627db70581 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -441,86 +441,88 @@ def _export( logger.warning("Exporter already shutdown, ignoring batch") return self._result.FAILURE # type: ignore [reportReturnType] - finish_export = self._metrics.start_export(self._count_data(data)) - - # FIXME remove this check if the export type for traces - # gets updated to a class that represents the proto - # TracesData and use the code below instead. - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - try: - if self._client is None: - return self._result.FAILURE - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=deadline_sec - time(), - ) - finish_export(None, None) - return self._result.SUCCESS # type: ignore [reportReturnType] - except RpcError as error: - retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue] - "google.rpc.retryinfo-bin" # type: ignore [reportArgumentType] - ) - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - backoff_seconds = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 + with self._metrics.export_operation(self._count_data(data)) as result: + # FIXME remove this check if the export type for traces + # gets updated to a class that represents the proto + # TracesData and use the code below instead. + deadline_sec = time() + self._timeout + for retry_num in range(_MAX_RETRYS): + try: + if self._client is None: + return self._result.FAILURE + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=deadline_sec - time(), ) - - # For UNAVAILABLE errors, reinitialize the channel to force reconnection - if error.code() == StatusCode.UNAVAILABLE and retry_num == 0: # type: ignore - logger.debug( - "Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error", - self._exporting, + return self._result.SUCCESS # type: ignore [reportReturnType] + except RpcError as error: + retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue] + "google.rpc.retryinfo-bin" # type: ignore [reportArgumentType] ) - try: - if self._channel: - self._channel.close() - except Exception as e: + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + backoff_seconds = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 + ) + + # For UNAVAILABLE errors, reinitialize the channel to force reconnection + if ( + error.code() == StatusCode.UNAVAILABLE + and retry_num == 0 + ): # type: ignore logger.debug( - "Error closing channel for %s exporter to %s: %s", + "Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error", + self._exporting, + ) + try: + if self._channel: + self._channel.close() + except Exception as e: + logger.debug( + "Error closing channel for %s exporter to %s: %s", + self._exporting, + self._endpoint, + str(e), + ) + # Enable channel reconnection for subsequent calls + self._initialize_channel_and_stub() + + if ( + error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue] + or retry_num + 1 == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + or self._shutdown + ): + logger.error( + "Failed to export %s to %s, error code: %s", self._exporting, self._endpoint, - str(e), + error.code(), # type: ignore [reportAttributeAccessIssue] + exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue] ) - # Enable channel reconnection for subsequent calls - self._initialize_channel_and_stub() - - if ( - error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue] - or retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - logger.error( - "Failed to export %s to %s, error code: %s", + result.error = error + result.error_attrs = { + RPC_RESPONSE_STATUS_CODE: error.code().name + } + return self._result.FAILURE # type: ignore [reportReturnType] + logger.warning( + "Transient error %s encountered while exporting %s to %s, retrying in %.2fs.", + error.code(), # type: ignore [reportAttributeAccessIssue] self._exporting, self._endpoint, - error.code(), # type: ignore [reportAttributeAccessIssue] - exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue] + backoff_seconds, ) - finish_export( - error, {RPC_RESPONSE_STATUS_CODE: error.code().name} - ) - return self._result.FAILURE # type: ignore [reportReturnType] - logger.warning( - "Transient error %s encountered while exporting %s to %s, retrying in %.2fs.", - error.code(), # type: ignore [reportAttributeAccessIssue] - self._exporting, - self._endpoint, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - logger.warning("Shutdown in progress, aborting retry.") - break - # Not possible to reach here but the linter is complaining. - return self._result.FAILURE # type: ignore [reportReturnType] + shutdown = self._shutdown_in_progress.wait(backoff_seconds) + if shutdown: + logger.warning("Shutdown in progress, aborting retry.") + break + # Not possible to reach here but the linter is complaining. + return self._result.FAILURE # type: ignore [reportReturnType] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: """ diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index f7da8977308..d3444a2c567 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -271,69 +271,69 @@ def _export_with_retries( _logger.warning("Exporter already shutdown, ignoring batch") return MetricExportResult.FAILURE - finish_export = self._metrics.start_export(num_items) + with self._metrics.export_operation(num_items) as result: + serialized_data = export_request.SerializeToString() + deadline_sec = time() + self._timeout + for retry_num in range(_MAX_RETRYS): + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None + try: + resp = self._export(serialized_data, deadline_sec - time()) + if resp.ok: + return MetricExportResult.SUCCESS + except requests.exceptions.RequestException as error: + reason = error + export_error = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code + + if not retryable: + _logger.error( + "Failed to export metrics batch code: %s, reason: %s", + status_code, + reason, + ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + result.error = export_error + result.error_attrs = error_attrs + return MetricExportResult.FAILURE + if ( + retry_num + 1 == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + or self._shutdown + ): + _logger.error( + "Failed to export metrics batch due to timeout, " + "max retries or shutdown." + ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + result.error = export_error + result.error_attrs = error_attrs + return MetricExportResult.FAILURE - serialized_data = export_request.SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - finish_export(None, None) - return MetricExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - status_code, + _logger.warning( + "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", reason, + backoff_seconds, ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - finish_export(export_error, error_attrs) - return MetricExportResult.FAILURE - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export metrics batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - finish_export(export_error, error_attrs) - return MetricExportResult.FAILURE - - _logger.warning( - "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return MetricExportResult.FAILURE + shutdown = self._shutdown_in_progress.wait(backoff_seconds) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break + return MetricExportResult.FAILURE def export( self, @@ -356,7 +356,9 @@ def export( # If no batch size configured, export as single batch with retries as configured if self._max_export_batch_size is None: - return self._export_with_retries(export_request, deadline_sec, num_items) + return self._export_with_retries( + export_request, deadline_sec, num_items + ) # Else, export in batches of configured size batched_export_requests = _split_metrics_data( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 6f40d4b8e22..c8e2fd484fb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -1626,6 +1626,7 @@ def assert_standard_metric_attrs(self, attributes): self.assertEqual(attributes["server.address"], "localhost") self.assertEqual(attributes["server.port"], 4318) + def _resource_metrics( index: int, scope_metrics: List[pb2.ScopeMetrics] ) -> pb2.ResourceMetrics: From dfbdd8a195d43816f745ab0aaec0827b27d3b4fa Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 2 Apr 2026 14:15:56 +0900 Subject: [PATCH 10/11] defaults --- .../exporter/otlp/proto/common/_exporter_metrics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py index e847dfd9ba5..96d12a8857d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py @@ -52,8 +52,8 @@ @dataclass class ExportResult: - error: Exception | None - error_attrs: Attributes + error: Exception | None = None + error_attrs: Attributes = None class ExporterMetrics: From 243cdb7c6b10b10ca60b69bdf172d1986747f604 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 2 Apr 2026 14:21:58 +0900 Subject: [PATCH 11/11] Fixes --- .../otlp/proto/http/_log_exporter/__init__.py | 118 +++++++++--------- .../proto/http/metric_exporter/__init__.py | 4 +- .../proto/http/trace_exporter/__init__.py | 118 +++++++++--------- 3 files changed, 120 insertions(+), 120 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index de17153a37a..6032433dd12 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -203,69 +203,69 @@ def export( _logger.warning("Exporter already shutdown, ignoring batch") return LogRecordExportResult.FAILURE - finish_export = self._metrics.start_export(len(batch)) + with self._metrics.export_operation(len(batch)) as result: + serialized_data = encode_logs(batch).SerializeToString() + deadline_sec = time() + self._timeout + for retry_num in range(_MAX_RETRYS): + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None + try: + resp = self._export(serialized_data, deadline_sec - time()) + if resp.ok: + return LogRecordExportResult.SUCCESS + except requests.exceptions.RequestException as error: + reason = error + export_error = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code - serialized_data = encode_logs(batch).SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - finish_export(None, None) - return LogRecordExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code + if not retryable: + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + status_code, + reason, + ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + result.error = export_error + result.error_attrs = error_attrs + return LogRecordExportResult.FAILURE - if not retryable: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - status_code, + if ( + retry_num + 1 == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + or self._shutdown + ): + _logger.error( + "Failed to export logs batch due to timeout, " + "max retries or shutdown." + ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + result.error = export_error + result.error_attrs = error_attrs + return LogRecordExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", reason, + backoff_seconds, ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - finish_export(export_error, error_attrs) - return LogRecordExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export logs batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - finish_export(export_error, error_attrs) - return LogRecordExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_is_occuring.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return LogRecordExportResult.FAILURE + shutdown = self._shutdown_is_occuring.wait(backoff_seconds) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break + return LogRecordExportResult.FAILURE def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index d3444a2c567..17a5b34a086 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -395,8 +395,8 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: def set_meter_provider(self, meter_provider: MeterProvider) -> None: self._metrics = ExporterMetrics( - OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER.value, - "metric_data_point", + OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER, + "metrics", urlparse(self._endpoint), meter_provider, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index ef01595c336..018d89df1ee 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -196,69 +196,69 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE - finish_export = self._metrics.start_export(len(spans)) + with self._metrics.export_operation(len(spans)) as result: + serialized_data = encode_spans(spans).SerializePartialToString() + deadline_sec = time() + self._timeout + for retry_num in range(_MAX_RETRYS): + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None + try: + resp = self._export(serialized_data, deadline_sec - time()) + if resp.ok: + return SpanExportResult.SUCCESS + except requests.exceptions.RequestException as error: + reason = error + export_error = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code - serialized_data = encode_spans(spans).SerializePartialToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - finish_export(None, None) - return SpanExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code + if not retryable: + _logger.error( + "Failed to export span batch code: %s, reason: %s", + status_code, + reason, + ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + result.error = export_error + result.error_attrs = error_attrs + return SpanExportResult.FAILURE - if not retryable: - _logger.error( - "Failed to export span batch code: %s, reason: %s", - status_code, + if ( + retry_num + 1 == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + or self._shutdown + ): + _logger.error( + "Failed to export span batch due to timeout, " + "max retries or shutdown." + ) + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + result.error = export_error + result.error_attrs = error_attrs + return SpanExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting span batch, retrying in %.2fs.", reason, + backoff_seconds, ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - finish_export(export_error, error_attrs) - return SpanExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export span batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - finish_export(export_error, error_attrs) - return SpanExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return SpanExportResult.FAILURE + shutdown = self._shutdown_in_progress.wait(backoff_seconds) + if shutdown: + _logger.warning("Shutdown in progress, aborting retry.") + break + return SpanExportResult.FAILURE def shutdown(self): if self._shutdown: