Skip to content

Commit 06d8042

Browse files
committed
Implement metric reader default aggregation controls
Fixes #2635
1 parent 0de0eab commit 06d8042

File tree

9 files changed

+135
-28
lines changed

9 files changed

+135
-28
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
from typing import TYPE_CHECKING, Dict, Iterable
1919

2020
from opentelemetry.sdk._metrics.aggregation import (
21+
DefaultAggregation,
2122
_Aggregation,
23+
_AggregationFactory,
2224
_convert_aggregation_temporality,
2325
_PointVarT,
2426
)
@@ -39,13 +41,15 @@ def __init__(
3941
view: View,
4042
instrument: "_Instrument",
4143
sdk_config: SdkConfiguration,
44+
instrument_class_aggregation: Dict[type, _AggregationFactory],
4245
):
4346
self._view = view
4447
self._instrument = instrument
4548
self._sdk_config = sdk_config
4649
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
4750
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
4851
self._lock = Lock()
52+
self._instrument_class_aggregation = instrument_class_aggregation
4953

5054
# pylint: disable=protected-access
5155
def consume_measurement(self, measurement: Measurement) -> None:
@@ -67,11 +71,19 @@ def consume_measurement(self, measurement: Measurement) -> None:
6771
if attributes not in self._attributes_aggregation:
6872
with self._lock:
6973
if attributes not in self._attributes_aggregation:
70-
self._attributes_aggregation[
71-
attributes
72-
] = self._view._aggregation._create_aggregation(
73-
self._instrument
74-
)
74+
if not isinstance(
75+
self._view._aggregation, DefaultAggregation
76+
):
77+
aggregation = (
78+
self._view._aggregation._create_aggregation(
79+
self._instrument
80+
)
81+
)
82+
else:
83+
aggregation = self._instrument_class_aggregation[
84+
self._instrument.__class__
85+
]._create_aggregation(self._instrument)
86+
self._attributes_aggregation[attributes] = aggregation
7587

7688
self._attributes_aggregation[attributes].aggregate(measurement)
7789

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
from os import environ, linesep
2020
from sys import stdout
2121
from threading import Event, RLock, Thread
22-
from typing import IO, Callable, Iterable, List, Optional, Sequence
22+
from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence
2323

2424
from opentelemetry.context import (
2525
_SUPPRESS_INSTRUMENTATION_KEY,
2626
attach,
2727
detach,
2828
set_value,
2929
)
30+
from opentelemetry.sdk._metrics.aggregation import _AggregationFactory
3031
from opentelemetry.sdk._metrics.metric_reader import MetricReader
3132
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
3233
from opentelemetry.util._once import Once
@@ -104,9 +105,13 @@ class InMemoryMetricReader(MetricReader):
104105

105106
def __init__(
106107
self,
108+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
107109
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
108110
) -> None:
109-
super().__init__(preferred_temporality=preferred_temporality)
111+
super().__init__(
112+
preferred_temporality=preferred_temporality,
113+
preferred_aggregation=preferred_aggregation,
114+
)
110115
self._lock = RLock()
111116
self._metrics: List[Metric] = []
112117

@@ -135,10 +140,14 @@ class PeriodicExportingMetricReader(MetricReader):
135140
def __init__(
136141
self,
137142
exporter: MetricExporter,
143+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
138144
export_interval_millis: Optional[float] = None,
139145
export_timeout_millis: Optional[float] = None,
140146
) -> None:
141-
super().__init__(preferred_temporality=exporter.preferred_temporality)
147+
super().__init__(
148+
preferred_temporality=exporter.preferred_temporality,
149+
preferred_aggregation=preferred_aggregation,
150+
)
142151
self._exporter = exporter
143152
if export_interval_millis is None:
144153
try:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
# pylint: disable=too-many-ancestors
1616

17-
import logging
18-
from typing import Dict, Generator, Iterable, Optional, Union
17+
from logging import getLogger
18+
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union
1919

2020
from opentelemetry._metrics.instrument import CallbackT
2121
from opentelemetry._metrics.instrument import Counter as APICounter
@@ -31,18 +31,23 @@
3131
)
3232
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
3333
from opentelemetry.sdk._metrics.measurement import Measurement
34-
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
3534
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
3635

37-
_logger = logging.getLogger(__name__)
36+
if TYPE_CHECKING:
37+
from opentelemetry.sdk._metrics.measurement_consumer import (
38+
MeasurementConsumer,
39+
)
40+
41+
42+
_logger = getLogger(__name__)
3843

3944

4045
class _Synchronous:
4146
def __init__(
4247
self,
4348
name: str,
4449
instrumentation_scope: InstrumentationScope,
45-
measurement_consumer: MeasurementConsumer,
50+
measurement_consumer: "MeasurementConsumer",
4651
unit: str = "",
4752
description: str = "",
4853
):
@@ -59,7 +64,7 @@ def __init__(
5964
self,
6065
name: str,
6166
instrumentation_scope: InstrumentationScope,
62-
measurement_consumer: MeasurementConsumer,
67+
measurement_consumer: "MeasurementConsumer",
6368
callbacks: Optional[Iterable[CallbackT]] = None,
6469
unit: str = "",
6570
description: str = "",

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None:
5151
self._sdk_config = sdk_config
5252
# should never be mutated
5353
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
54-
reader: MetricReaderStorage(sdk_config)
54+
reader: MetricReaderStorage(
55+
sdk_config, reader._instrument_class_aggregation
56+
)
5557
for reader in sdk_config.metric_readers
5658
}
5759
self._async_instruments: List["_Asynchronous"] = []

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,68 @@
1414

1515
import logging
1616
from abc import ABC, abstractmethod
17-
from typing import Callable, Iterable
17+
from typing import Callable, Dict, Iterable
1818

1919
from typing_extensions import final
2020

21+
from opentelemetry.sdk._metrics.aggregation import (
22+
DefaultAggregation,
23+
_AggregationFactory,
24+
)
25+
from opentelemetry.sdk._metrics.instrument import (
26+
Counter,
27+
Histogram,
28+
ObservableCounter,
29+
ObservableGauge,
30+
ObservableUpDownCounter,
31+
UpDownCounter,
32+
)
2133
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
2234

2335
_logger = logging.getLogger(__name__)
2436

2537

2638
class MetricReader(ABC):
2739
"""
40+
Base class for all metric readers
41+
42+
Args:
43+
preferred_aggregation: A mapping between instrument classes and
44+
aggregation instances. By default maps all instrument classes to an
45+
instance of `DefaultAggregation`. This mapping will be used to
46+
define the default aggregation of every instrument class. If the
47+
user wants to make a change in the default aggregation of an
48+
instrument class, it is enough to pass here a dictionary whose keys
49+
are the instrument classes and the values are the corresponding
50+
desired aggregation for the instrument classes that the user wants
51+
to change, not necessarily all of them. The classes not included in
52+
the passed dictionary will retain their association to their
53+
default aggregations.
54+
2855
.. document protected _receive_metrics which is a intended to be overriden by subclass
2956
.. automethod:: _receive_metrics
3057
"""
3158

3259
def __init__(
3360
self,
3461
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
62+
preferred_aggregation: Dict[type, _AggregationFactory] = None,
3563
) -> None:
3664
self._collect: Callable[
3765
["MetricReader", AggregationTemporality], Iterable[Metric]
3866
] = None
67+
3968
self._preferred_temporality = preferred_temporality
69+
self._instrument_class_aggregation = {
70+
Counter: DefaultAggregation(),
71+
UpDownCounter: DefaultAggregation(),
72+
Histogram: DefaultAggregation(),
73+
ObservableCounter: DefaultAggregation(),
74+
ObservableUpDownCounter: DefaultAggregation(),
75+
ObservableGauge: DefaultAggregation(),
76+
}
77+
78+
self._instrument_class_aggregation.update(preferred_aggregation or {})
4079

4180
@final
4281
def collect(self) -> None:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from opentelemetry.sdk._metrics._view_instrument_match import (
2020
_ViewInstrumentMatch,
2121
)
22-
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
22+
from opentelemetry.sdk._metrics.aggregation import (
23+
AggregationTemporality,
24+
_AggregationFactory,
25+
)
2326
from opentelemetry.sdk._metrics.measurement import Measurement
2427
from opentelemetry.sdk._metrics.point import Metric
2528
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
@@ -31,12 +34,17 @@
3134
class MetricReaderStorage:
3235
"""The SDK's storage for a given reader"""
3336

34-
def __init__(self, sdk_config: SdkConfiguration) -> None:
37+
def __init__(
38+
self,
39+
sdk_config: SdkConfiguration,
40+
instrument_class_aggregation: Dict[type, _AggregationFactory],
41+
) -> None:
3542
self._lock = RLock()
3643
self._sdk_config = sdk_config
3744
self._view_instrument_match: Dict[
3845
Instrument, List[_ViewInstrumentMatch]
3946
] = {}
47+
self._instrument_class_aggregation = instrument_class_aggregation
4048

4149
def _get_or_init_view_instrument_match(
4250
self, instrument: Instrument
@@ -62,6 +70,9 @@ def _get_or_init_view_instrument_match(
6270
view=view,
6371
instrument=instrument,
6472
sdk_config=self._sdk_config,
73+
instrument_class_aggregation=(
74+
self._instrument_class_aggregation
75+
),
6576
)
6677
)
6778

@@ -72,6 +83,9 @@ def _get_or_init_view_instrument_match(
7283
view=_DEFAULT_VIEW,
7384
instrument=instrument,
7485
sdk_config=self._sdk_config,
86+
instrument_class_aggregation=(
87+
self._instrument_class_aggregation
88+
),
7589
)
7690
)
7791
self._view_instrument_match[instrument] = view_instrument_matches

opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from unittest.mock import Mock, patch
15+
from unittest.mock import MagicMock, Mock, patch
1616

17-
from opentelemetry.sdk._metrics.aggregation import DropAggregation
17+
from opentelemetry.sdk._metrics.aggregation import (
18+
DefaultAggregation,
19+
DropAggregation,
20+
)
1821
from opentelemetry.sdk._metrics.instrument import Counter
1922
from opentelemetry.sdk._metrics.measurement import Measurement
2023
from opentelemetry.sdk._metrics.metric_reader_storage import (
@@ -56,7 +59,8 @@ def test_creates_view_instrument_matches(
5659
resource=Mock(),
5760
metric_readers=(),
5861
views=(view1, view2),
59-
)
62+
),
63+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
6064
)
6165

6266
# instrument1 matches view1 and view2, so should create two ViewInstrumentMatch objects
@@ -100,7 +104,8 @@ def test_forwards_calls_to_view_instrument_match(
100104
resource=Mock(),
101105
metric_readers=(),
102106
views=(view1, view2),
103-
)
107+
),
108+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
104109
)
105110

106111
# Measurements from an instrument should be passed on to each ViewInstrumentMatch objects
@@ -147,7 +152,8 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock):
147152
resource=Mock(),
148153
metric_readers=(),
149154
views=(view1,),
150-
)
155+
),
156+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
151157
)
152158

153159
def send_measurement():
@@ -172,7 +178,8 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock):
172178
resource=Mock(),
173179
metric_readers=(),
174180
views=(),
175-
)
181+
),
182+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
176183
)
177184

178185
storage.consume_measurement(Measurement(1, instrument1))
@@ -200,7 +207,8 @@ def test_drop_aggregation(self):
200207
instrument_name="name", aggregation=DropAggregation()
201208
),
202209
),
203-
)
210+
),
211+
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
204212
)
205213
metric_reader_storage.consume_measurement(Measurement(1, counter))
206214

opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ def _create_periodic_reader(
8484
self, metrics, exporter, collect_wait=0, interval=60000
8585
):
8686

87-
pmr = PeriodicExportingMetricReader(exporter, interval)
87+
pmr = PeriodicExportingMetricReader(
88+
exporter, export_interval_millis=interval
89+
)
8890

8991
def _collect(reader, temp):
9092
time.sleep(collect_wait)
@@ -95,7 +97,7 @@ def _collect(reader, temp):
9597

9698
def test_ticker_called(self):
9799
collect_mock = Mock()
98-
pmr = PeriodicExportingMetricReader(Mock(), 1)
100+
pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1)
99101
pmr._set_collect_callback(collect_mock)
100102
time.sleep(0.1)
101103
self.assertTrue(collect_mock.assert_called_once)

0 commit comments

Comments
 (0)