Skip to content

Commit 2aff649

Browse files
committed
Implement metric reader default aggregation controls
Fixes #2635
1 parent 24ac96e commit 2aff649

File tree

10 files changed

+276
-28
lines changed

10 files changed

+276
-28
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
from typing import TYPE_CHECKING, Dict, Iterable
1919

2020
from opentelemetry.sdk._metrics.aggregation import (
21+
DefaultAggregation,
2122
_Aggregation,
23+
_AggregationFactory,
2224
_convert_aggregation_temporality,
2325
_PointVarT,
2426
)
@@ -39,13 +41,15 @@ def __init__(
3941
view: View,
4042
instrument: "_Instrument",
4143
sdk_config: SdkConfiguration,
44+
instrument_class_aggregation: Dict[type, _AggregationFactory],
4245
):
4346
self._view = view
4447
self._instrument = instrument
4548
self._sdk_config = sdk_config
4649
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
4750
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
4851
self._lock = Lock()
52+
self._instrument_class_aggregation = instrument_class_aggregation
4953

5054
# pylint: disable=protected-access
5155
def consume_measurement(self, measurement: Measurement) -> None:
@@ -67,11 +71,19 @@ def consume_measurement(self, measurement: Measurement) -> None:
6771
if attributes not in self._attributes_aggregation:
6872
with self._lock:
6973
if attributes not in self._attributes_aggregation:
70-
self._attributes_aggregation[
71-
attributes
72-
] = self._view._aggregation._create_aggregation(
73-
self._instrument
74-
)
74+
if not isinstance(
75+
self._view._aggregation, DefaultAggregation
76+
):
77+
aggregation = (
78+
self._view._aggregation._create_aggregation(
79+
self._instrument
80+
)
81+
)
82+
else:
83+
aggregation = self._instrument_class_aggregation[
84+
self._instrument.__class__
85+
]._create_aggregation(self._instrument)
86+
self._attributes_aggregation[attributes] = aggregation
7587

7688
self._attributes_aggregation[attributes].aggregate(measurement)
7789

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
from os import environ, linesep
2020
from sys import stdout
2121
from threading import Event, RLock, Thread
22-
from typing import IO, Callable, Iterable, List, Optional, Sequence
22+
from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence
2323

2424
from opentelemetry.context import (
2525
_SUPPRESS_INSTRUMENTATION_KEY,
2626
attach,
2727
detach,
2828
set_value,
2929
)
30+
from opentelemetry.sdk._metrics.aggregation import _AggregationFactory
3031
from opentelemetry.sdk._metrics.metric_reader import MetricReader
3132
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
3233
from opentelemetry.util._once import Once
@@ -104,9 +105,13 @@ class InMemoryMetricReader(MetricReader):
104105

105106
def __init__(
106107
self,
108+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
107109
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
108110
) -> None:
109-
super().__init__(preferred_temporality=preferred_temporality)
111+
super().__init__(
112+
preferred_temporality=preferred_temporality,
113+
preferred_aggregation=preferred_aggregation,
114+
)
110115
self._lock = RLock()
111116
self._metrics: List[Metric] = []
112117

@@ -135,10 +140,14 @@ class PeriodicExportingMetricReader(MetricReader):
135140
def __init__(
136141
self,
137142
exporter: MetricExporter,
143+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
138144
export_interval_millis: Optional[float] = None,
139145
export_timeout_millis: Optional[float] = None,
140146
) -> None:
141-
super().__init__(preferred_temporality=exporter.preferred_temporality)
147+
super().__init__(
148+
preferred_temporality=exporter.preferred_temporality,
149+
preferred_aggregation=preferred_aggregation,
150+
)
142151
self._exporter = exporter
143152
if export_interval_millis is None:
144153
try:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
# pylint: disable=too-many-ancestors
1616

17-
import logging
18-
from typing import Dict, Generator, Iterable, Optional, Union
17+
from logging import getLogger
18+
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union
1919

2020
from opentelemetry._metrics.instrument import CallbackT
2121
from opentelemetry._metrics.instrument import Counter as APICounter
@@ -31,18 +31,23 @@
3131
)
3232
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
3333
from opentelemetry.sdk._metrics.measurement import Measurement
34-
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
3534
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
3635

37-
_logger = logging.getLogger(__name__)
36+
if TYPE_CHECKING:
37+
from opentelemetry.sdk._metrics.measurement_consumer import (
38+
MeasurementConsumer,
39+
)
40+
41+
42+
_logger = getLogger(__name__)
3843

3944

4045
class _Synchronous:
4146
def __init__(
4247
self,
4348
name: str,
4449
instrumentation_scope: InstrumentationScope,
45-
measurement_consumer: MeasurementConsumer,
50+
measurement_consumer: "MeasurementConsumer",
4651
unit: str = "",
4752
description: str = "",
4853
):
@@ -59,7 +64,7 @@ def __init__(
5964
self,
6065
name: str,
6166
instrumentation_scope: InstrumentationScope,
62-
measurement_consumer: MeasurementConsumer,
67+
measurement_consumer: "MeasurementConsumer",
6368
callbacks: Optional[Iterable[CallbackT]] = None,
6469
unit: str = "",
6570
description: str = "",

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None:
5151
self._sdk_config = sdk_config
5252
# should never be mutated
5353
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
54-
reader: MetricReaderStorage(sdk_config)
54+
reader: MetricReaderStorage(
55+
sdk_config, reader._instrument_class_aggregation
56+
)
5557
for reader in sdk_config.metric_readers
5658
}
5759
self._async_instruments: List["_Asynchronous"] = []

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,70 @@
1414

1515
import logging
1616
from abc import ABC, abstractmethod
17-
from typing import Callable, Iterable
17+
from typing import Callable, Dict, Iterable
1818

1919
from typing_extensions import final
2020

21+
from opentelemetry.sdk._metrics.aggregation import (
22+
DefaultAggregation,
23+
_AggregationFactory,
24+
)
25+
from opentelemetry.sdk._metrics.instrument import (
26+
Counter,
27+
Histogram,
28+
ObservableCounter,
29+
ObservableGauge,
30+
ObservableUpDownCounter,
31+
UpDownCounter,
32+
)
2133
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
2234

2335
_logger = logging.getLogger(__name__)
2436

2537

2638
class MetricReader(ABC):
2739
"""
40+
Base class for all metric readers
41+
42+
Args:
43+
preferred_aggregation: A mapping between instrument classes and
44+
aggregation instances. By default maps all instrument classes to an
45+
instance of `DefaultAggregation`. This mapping will be used to
46+
define the default aggregation of every instrument class. If the
47+
user wants to make a change in the default aggregation of an
48+
instrument class, it is enough to pass here a dictionary whose keys
49+
are the instrument classes and the values are the corresponding
50+
desired aggregation for the instrument classes that the user wants
51+
to change, not necessarily all of them. The classes not included in
52+
the passed dictionary will retain their association to their
53+
default aggregations. The aggregation defined here will be
54+
overriden by an aggregation defined by a view that is not
55+
`DefaultAggregation`.
56+
2857
.. document protected _receive_metrics which is a intended to be overriden by subclass
2958
.. automethod:: _receive_metrics
3059
"""
3160

3261
def __init__(
3362
self,
3463
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
64+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
3565
) -> None:
3666
self._collect: Callable[
3767
["MetricReader", AggregationTemporality], Iterable[Metric]
3868
] = None
69+
3970
self._preferred_temporality = preferred_temporality
71+
self._instrument_class_aggregation = {
72+
Counter: DefaultAggregation(),
73+
UpDownCounter: DefaultAggregation(),
74+
Histogram: DefaultAggregation(),
75+
ObservableCounter: DefaultAggregation(),
76+
ObservableUpDownCounter: DefaultAggregation(),
77+
ObservableGauge: DefaultAggregation(),
78+
}
79+
80+
self._instrument_class_aggregation.update(preferred_aggregation or {})
4081

4182
@final
4283
def collect(self) -> None:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from opentelemetry.sdk._metrics._view_instrument_match import (
2020
_ViewInstrumentMatch,
2121
)
22-
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
22+
from opentelemetry.sdk._metrics.aggregation import (
23+
AggregationTemporality,
24+
_AggregationFactory,
25+
)
2326
from opentelemetry.sdk._metrics.measurement import Measurement
2427
from opentelemetry.sdk._metrics.point import Metric
2528
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
@@ -31,12 +34,17 @@
3134
class MetricReaderStorage:
3235
"""The SDK's storage for a given reader"""
3336

34-
def __init__(self, sdk_config: SdkConfiguration) -> None:
37+
def __init__(
38+
self,
39+
sdk_config: SdkConfiguration,
40+
instrument_class_aggregation: Dict[type, _AggregationFactory],
41+
) -> None:
3542
self._lock = RLock()
3643
self._sdk_config = sdk_config
3744
self._view_instrument_match: Dict[
3845
Instrument, List[_ViewInstrumentMatch]
3946
] = {}
47+
self._instrument_class_aggregation = instrument_class_aggregation
4048

4149
def _get_or_init_view_instrument_match(
4250
self, instrument: Instrument
@@ -62,6 +70,9 @@ def _get_or_init_view_instrument_match(
6270
view=view,
6371
instrument=instrument,
6472
sdk_config=self._sdk_config,
73+
instrument_class_aggregation=(
74+
self._instrument_class_aggregation
75+
),
6576
)
6677
)
6778

@@ -72,6 +83,9 @@ def _get_or_init_view_instrument_match(
7283
view=_DEFAULT_VIEW,
7384
instrument=instrument,
7485
sdk_config=self._sdk_config,
86+
instrument_class_aggregation=(
87+
self._instrument_class_aggregation
88+
),
7589
)
7690
)
7791
self._view_instrument_match[instrument] = view_instrument_matches
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Dict
16+
from unittest import TestCase
17+
18+
from opentelemetry.sdk._metrics.aggregation import (
19+
AggregationTemporality,
20+
DefaultAggregation,
21+
LastValueAggregation,
22+
_AggregationFactory,
23+
)
24+
from opentelemetry.sdk._metrics.instrument import (
25+
Counter,
26+
Histogram,
27+
ObservableCounter,
28+
ObservableGauge,
29+
ObservableUpDownCounter,
30+
UpDownCounter,
31+
)
32+
from opentelemetry.sdk._metrics.metric_reader import MetricReader
33+
34+
35+
class DummyMetricReader(MetricReader):
36+
def __init__(
37+
self,
38+
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
39+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
40+
) -> None:
41+
super().__init__(
42+
preferred_temporality=preferred_temporality,
43+
preferred_aggregation=preferred_aggregation,
44+
)
45+
46+
def _receive_metrics(self, metrics):
47+
pass
48+
49+
def shutdown(self):
50+
return True
51+
52+
53+
class TestMetricReader(TestCase):
54+
def test_default_temporality(self):
55+
56+
dummy_metric_reader = DummyMetricReader()
57+
58+
self.assertEqual(
59+
dummy_metric_reader._instrument_class_aggregation.keys(),
60+
set(
61+
[
62+
Counter,
63+
UpDownCounter,
64+
Histogram,
65+
ObservableCounter,
66+
ObservableUpDownCounter,
67+
ObservableGauge,
68+
]
69+
),
70+
)
71+
for (
72+
value
73+
) in dummy_metric_reader._instrument_class_aggregation.values():
74+
self.assertIsInstance(value, DefaultAggregation)
75+
76+
dummy_metric_reader = DummyMetricReader(
77+
preferred_aggregation={Counter: LastValueAggregation()}
78+
)
79+
self.assertEqual(
80+
dummy_metric_reader._instrument_class_aggregation.keys(),
81+
set(
82+
[
83+
Counter,
84+
UpDownCounter,
85+
Histogram,
86+
ObservableCounter,
87+
ObservableUpDownCounter,
88+
ObservableGauge,
89+
]
90+
),
91+
)
92+
self.assertIsInstance(
93+
dummy_metric_reader._instrument_class_aggregation[Counter],
94+
LastValueAggregation,
95+
)

0 commit comments

Comments
 (0)