Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4970](https://github.com/open-telemetry/opentelemetry-python/pull/4970))
- `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0
([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965))
- `opentelemetry-sdk`: implement exporter metrics
([#4976](https://github.com/open-telemetry/opentelemetry-python/pull/4976))
- improve check-links ci job
([#4978](https://github.com/open-telemetry/opentelemetry-python/pull/4978))
- Resolve some Pyright type errors in Span/ReadableSpan and utility stubs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from collections import Counter
from time import perf_counter
from typing import TYPE_CHECKING, Callable

from opentelemetry.metrics import MeterProvider, get_meter_provider
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OTEL_COMPONENT_NAME,
OTEL_COMPONENT_TYPE,
OtelComponentTypeValues,
)
from opentelemetry.semconv._incubating.metrics.otel_metrics import (
create_otel_sdk_exporter_log_exported,
create_otel_sdk_exporter_log_inflight,
create_otel_sdk_exporter_metric_data_point_exported,
create_otel_sdk_exporter_metric_data_point_inflight,
create_otel_sdk_exporter_operation_duration,
create_otel_sdk_exporter_span_exported,
create_otel_sdk_exporter_span_inflight,
)
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)

if TYPE_CHECKING:
from typing import Literal
from urllib.parse import ParseResult as UrlParseResult

from opentelemetry.util.types import Attributes, AttributeValue

_component_counter = Counter()


class ExporterMetrics:
def __init__(
self,
component_type: OtelComponentTypeValues | None,
signal: Literal["traces", "metrics", "logs"],
endpoint: UrlParseResult,
meter_provider: MeterProvider | None,
) -> None:
if signal == "traces":
create_exported = create_otel_sdk_exporter_span_exported
create_inflight = create_otel_sdk_exporter_span_inflight
elif signal == "logs":
create_exported = create_otel_sdk_exporter_log_exported
create_inflight = create_otel_sdk_exporter_log_inflight
else:
create_exported = (
create_otel_sdk_exporter_metric_data_point_exported
)
create_inflight = (
create_otel_sdk_exporter_metric_data_point_inflight
)

port = endpoint.port
if port is None:
if endpoint.scheme == "https":
port = 443
elif endpoint.scheme == "http":
port = 80

component_type = (
component_type or OtelComponentTypeValues("unknown_otlp_exporter")
).value
count = _component_counter[component_type]
_component_counter[component_type] = count + 1
self._standard_attrs: dict[str, AttributeValue] = {
OTEL_COMPONENT_TYPE: component_type,
OTEL_COMPONENT_NAME: f"{component_type}/{count}",
}
if endpoint.hostname:
self._standard_attrs[SERVER_ADDRESS] = endpoint.hostname
if port is not None:
self._standard_attrs[SERVER_PORT] = port

meter_provider = meter_provider or get_meter_provider()
meter = meter_provider.get_meter("opentelemetry-sdk")
self._inflight = create_inflight(meter)
self._exported = create_exported(meter)
self._duration = create_otel_sdk_exporter_operation_duration(meter)

def start_export(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead just have three methods on_start, on_end and on_error (similar to the logging implementation) and use an instance variable or ContextVar for tracking the duration?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that would simplify things, especially since exceptions are suppressed by the exporter implementations (otherwise a context manager would work great). ContextVar is good for black-box code but seems unnecessary when we are instrumenting the SDK itself. Could return a class, but it seems like it would be very similar to the Callable. Can do that if it seems better though.

Copy link
Copy Markdown
Contributor

@lzchen lzchen Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does using a context manager as a mutable object work to record error info? We can use finally to do metric stuff after return is called in export.

with self._metrics.export_operation(self._count_data(data)) as result:
...
    result.error = error
    result.error_attrs = {RPC_RESPONSE_STATUS_CODE: error.code().name}

and then in ExporterMetrics:

...
  @contextmanager
  def export_operation(self, num_items: int):
       result = ExportResult()
      try:
          yield result
      finally:
           ...
          duration_attrs = result.error_attrs if result.error_attrs else _standard_attrs
      self._duration.record(metric, duration_attrs)

start_export -> finish_export would probably work in practice because the operations in finish are fine to be left dangling if we don't call them in every case (like if exporter shutdown during export) but design wise it seems dangerous to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the idea! I'll give that a try

self, num_items: int
) -> Callable[[Exception | None, Attributes], None]:
start_time = perf_counter()
self._inflight.add(num_items, self._standard_attrs)

def finish_export(
error: Exception | None,
error_attrs: Attributes,
):
end_time = perf_counter()
self._inflight.add(-num_items, self._standard_attrs)
exported_attrs = (
{**self._standard_attrs, ERROR_TYPE: type(error).__qualname__}
if error
else self._standard_attrs
)
self._exported.add(num_items, exported_attrs)
duration_attrs = (
{**exported_attrs, **error_attrs}
if error_attrs
else exported_attrs
)
self._duration.record(end_time - start_time, duration_attrs)

return finish_export
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
_get_credentials,
environ_to_compression,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
Expand All @@ -44,6 +45,9 @@
OTEL_EXPORTER_OTLP_LOGS_INSECURE,
OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)


class OTLPLogExporter(
Expand All @@ -66,6 +70,8 @@ def __init__(
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
channel_options: Optional[Tuple[Tuple[str, str]]] = None,
*,
meter_provider: Optional[MeterProvider] = None,
):
insecure_logs = environ.get(OTEL_EXPORTER_OTLP_LOGS_INSECURE)
if insecure is None and insecure_logs is not None:
Expand Down Expand Up @@ -105,13 +111,19 @@ def __init__(
stub=LogsServiceStub,
result=LogRecordExportResult,
channel_options=channel_options,
component_type=OtelComponentTypeValues.OTLP_GRPC_LOG_EXPORTER,
signal="logs",
meter_provider=meter_provider,
)

def _translate_data(
self, data: Sequence[ReadableLogRecord]
) -> ExportLogsServiceRequest:
return encode_logs(data)

def _count_data(self, data: Sequence[ReadableLogRecord]):
return len(data)

def export( # type: ignore [reportIncompatibleMethodOverride]
self,
batch: Sequence[ReadableLogRecord],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@
secure_channel,
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._exporter_metrics import (
ExporterMetrics,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_CHANNEL_OPTIONS,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
Expand Down Expand Up @@ -104,6 +108,12 @@
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExportResult
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)
from opentelemetry.semconv._incubating.attributes.rpc_attributes import (
RPC_RESPONSE_STATUS_CODE,
)
from opentelemetry.util._importlib_metadata import entry_points
from opentelemetry.util.re import parse_env_headers

Expand Down Expand Up @@ -299,6 +309,10 @@ def __init__(
timeout: Optional[float] = None,
compression: Optional[Compression] = None,
channel_options: Optional[Tuple[Tuple[str, str]]] = None,
*,
component_type: Union[OtelComponentTypeValues, None] = None,
signal: Literal["traces", "metrics", "logs"] = "traces",
meter_provider: Optional[MeterProvider] = None,
):
super().__init__()
self._result = result
Expand Down Expand Up @@ -372,6 +386,16 @@ def __init__(
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)

self._component_type = component_type
self._signal: Literal["traces", "metrics", "logs"] = signal
self._parsed_url = parsed_url
self._metrics = ExporterMetrics(
self._component_type,
signal,
parsed_url,
meter_provider,
)

self._initialize_channel_and_stub()

def _initialize_channel_and_stub(self):
Expand Down Expand Up @@ -404,6 +428,13 @@ def _translate_data(
) -> ExportServiceRequestT:
pass

@abstractmethod
def _count_data(
self,
data: SDKDataT,
) -> int:
pass

def _export(
self,
data: SDKDataT,
Expand All @@ -412,6 +443,8 @@ def _export(
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE # type: ignore [reportReturnType]

finish_export = self._metrics.start_export(self._count_data(data))

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
Expand All @@ -425,6 +458,7 @@ def _export(
metadata=self._headers,
timeout=deadline_sec - time(),
)
finish_export(None, None)
return self._result.SUCCESS # type: ignore [reportReturnType]
except RpcError as error:
retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue]
Expand Down Expand Up @@ -472,6 +506,9 @@ def _export(
error.code(), # type: ignore [reportAttributeAccessIssue]
exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue]
)
finish_export(
error, {RPC_RESPONSE_STATUS_CODE: error.code().name}
)
return self._result.FAILURE # type: ignore [reportReturnType]
logger.warning(
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
Expand Down Expand Up @@ -510,3 +547,11 @@ def _exporting(self) -> str:
warning messages.
"""
pass

def _set_meter_provider(self, meter_provider: MeterProvider) -> None:
self._metrics = ExporterMetrics(
self._component_type,
self._signal,
self._parsed_url,
meter_provider,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dataclasses import replace
from logging import getLogger
from os import environ
from typing import Iterable, List, Tuple, Union
from typing import Iterable, List, Optional, Tuple, Union
from typing import Sequence as TypingSequence

from grpc import ChannelCredentials, Compression
Expand All @@ -32,6 +32,7 @@
environ_to_compression,
get_resource_data,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
Expand Down Expand Up @@ -72,6 +73,9 @@
from opentelemetry.sdk.metrics.export import ( # noqa: F401
Histogram as HistogramType,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)

_logger = getLogger(__name__)

Expand Down Expand Up @@ -109,6 +113,8 @@ def __init__(
preferred_aggregation: dict[type, Aggregation] | None = None,
max_export_batch_size: int | None = None,
channel_options: Tuple[Tuple[str, str]] | None = None,
*,
meter_provider: Optional[MeterProvider] = None,
):
insecure_metrics = environ.get(OTEL_EXPORTER_OTLP_METRICS_INSECURE)
if insecure is None and insecure_metrics is not None:
Expand Down Expand Up @@ -153,6 +159,9 @@ def __init__(
timeout=timeout or environ_timeout,
compression=compression,
channel_options=channel_options,
component_type=OtelComponentTypeValues.OTLP_GRPC_METRIC_EXPORTER,
signal="metrics",
meter_provider=meter_provider,
)

self._max_export_batch_size: int | None = max_export_batch_size
Expand All @@ -162,6 +171,16 @@ def _translate_data( # type: ignore [reportIncompatibleMethodOverride]
) -> ExportMetricsServiceRequest:
return encode_metrics(data)

def _count_data(self, data: MetricsData):
num_items = 0

for resource_metrics in data.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
num_items += len(metric.data.data_points)

return num_items

def export(
self,
metrics_data: MetricsData,
Expand Down Expand Up @@ -268,6 +287,9 @@ def _split_metrics_data(
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

def set_meter_provider(self, meter_provider: MeterProvider):
return self._set_meter_provider(meter_provider)

@property
def _exporting(self) -> str:
return "metrics"
Expand Down
Loading
Loading