Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add default aggregation
Fixes #2537
  • Loading branch information
ocelotl committed Mar 28, 2022
commit d18fa2e5600aba88d069015d5d7181d155c54c3f
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,11 @@ def consume_measurement(self, measurement: Measurement) -> None:
if attributes not in self._attributes_aggregation:
with self._lock:
if attributes not in self._attributes_aggregation:
if self._view._aggregation:
aggregation = (
self._view._aggregation._create_aggregation(
self._instrument
)
)
else:
aggregation = self._instrument._default_aggregation
self._attributes_aggregation[attributes] = aggregation
self._attributes_aggregation[
attributes
] = self._view._aggregation._create_aggregation(
self._instrument
)

self._attributes_aggregation[attributes].aggregate(measurement)

Expand Down
67 changes: 61 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@

from opentelemetry._metrics.instrument import (
Asynchronous,
Counter,
Instrument,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
Synchronous,
UpDownCounter,
_Monotonic,
)
from opentelemetry.sdk._metrics.measurement import Measurement
Expand Down Expand Up @@ -67,6 +72,62 @@ def collect(self) -> Optional[_PointVarT]:
pass


class _AggregationFactory(ABC):
@abstractmethod
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
"""Creates an aggregation"""


class DefaultAggregation(_AggregationFactory):
"""
The default aggregation to be used in a `View`.

This aggregation will create an actual aggregation depending on the
instrument type, as specified next:

`Counter` → `SumAggregation`
`UpDownCounter` → `SumAggregation`
`ObservableCounter` → `SumAggregation`
`ObservableUpDownCounter` → `SumAggregation`
`Histogram` → `ExplicitBucketHistogramAggregation`
`ObservableGauge` → `LastValueAggregation`
"""

def _create_aggregation(self, instrument: Instrument) -> _Aggregation:

# pylint: disable=too-many-return-statements
if isinstance(instrument, Counter):
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.DELTA,
)
if isinstance(instrument, UpDownCounter):
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.DELTA,
)

if isinstance(instrument, ObservableCounter):
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)

if isinstance(instrument, ObservableUpDownCounter):
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)

if isinstance(instrument, Histogram):
return _ExplicitBucketHistogramAggregation()

if isinstance(instrument, ObservableGauge):
return _LastValueAggregation()

return None


class _SumAggregation(_Aggregation[Sum]):
def __init__(
self,
Expand Down Expand Up @@ -340,12 +401,6 @@ def _convert_aggregation_temporality(
return None


class _AggregationFactory(ABC):
@abstractmethod
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
"""Creates an aggregation"""


class ExplicitBucketHistogramAggregation(_AggregationFactory):
def __init__(
self,
Expand Down
55 changes: 5 additions & 50 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# pylint: disable=too-many-ancestors

import logging
from abc import ABC, abstractmethod
from typing import Callable, Dict, Generator, Iterable, Union

from opentelemetry._metrics.instrument import CallbackT
Expand All @@ -32,28 +31,14 @@
)
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry._metrics.measurement import Measurement as APIMeasurement
from opentelemetry.sdk._metrics.aggregation import (
_Aggregation,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

_logger = logging.getLogger(__name__)


class _Instrument(ABC):
@property
@abstractmethod
def _default_aggregation(self) -> _Aggregation:
pass


class _Synchronous(_Instrument):
class _Synchronous:
def __init__(
self,
name: str,
Expand All @@ -70,7 +55,7 @@ def __init__(
super().__init__(name, unit=unit, description=description)


class _Asynchronous(_Instrument):
class _Asynchronous:
def __init__(
self,
name: str,
Expand Down Expand Up @@ -108,13 +93,6 @@ def callback(self) -> Iterable[Measurement]:


class Counter(_Synchronous, APICounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.DELTA,
)

def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -129,13 +107,6 @@ def add(


class UpDownCounter(_Synchronous, APIUpDownCounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.DELTA,
)

def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -145,28 +116,14 @@ def add(


class ObservableCounter(_Asynchronous, APIObservableCounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)
pass


class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter):
@property
def _default_aggregation(self) -> _Aggregation:
return _SumAggregation(
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)
pass


class Histogram(_Synchronous, APIHistogram):
@property
def _default_aggregation(self) -> _Aggregation:
return _ExplicitBucketHistogramAggregation()

def record(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
Expand All @@ -182,6 +139,4 @@ def record(


class ObservableGauge(_Asynchronous, APIObservableGauge):
@property
def _default_aggregation(self) -> _Aggregation:
return _LastValueAggregation()
pass
12 changes: 8 additions & 4 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from typing_extensions import final

from opentelemetry._metrics.instrument import Instrument
from opentelemetry.sdk._metrics.aggregation import _AggregationFactory
from opentelemetry.sdk._metrics.aggregation import (
DefaultAggregation,
_AggregationFactory,
)

_logger = getLogger(__name__)

Expand Down Expand Up @@ -69,8 +72,9 @@ class View:
are in ``attribute_keys`` will be used to identify the metric stream.

aggregation: This is a metric stream customizing attribute: the
aggregatation instance to use when data is aggregated for the corresponding metrics
stream. If `None` the default aggregation of the instrument will be used.
aggregation instance to use when data is aggregated for the
corresponding metrics stream. If `None` an instance of
`DefaultAggregation` will be used.

This class is not intended to be subclassed by the user.
"""
Expand Down Expand Up @@ -122,7 +126,7 @@ def __init__(

self._description = description
self._attribute_keys = attribute_keys
self._aggregation = aggregation
self._aggregation = aggregation or DefaultAggregation()

# pylint: disable=too-many-return-statements
# pylint: disable=too-many-branches
Expand Down
86 changes: 86 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from opentelemetry.sdk._metrics import instrument
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
DefaultAggregation,
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SumAggregation,
Expand All @@ -31,6 +32,13 @@
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics.instrument import (
Counter,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Gauge, Histogram, Sum
from opentelemetry.util.types import Attributes
Expand Down Expand Up @@ -817,3 +825,81 @@ def test_last_value_factory(self):
self.assertIsInstance(aggregation, _LastValueAggregation)
aggregation2 = factory._create_aggregation(counter)
self.assertNotEqual(aggregation, aggregation2)


class TestDefaultAggregation(TestCase):
@classmethod
def setUpClass(cls):
cls.default_aggregation = DefaultAggregation()

def test_counter(self):

aggregation = self.default_aggregation._create_aggregation(
Counter(Mock(), Mock(), Mock())
)
self.assertIsInstance(aggregation, _SumAggregation)
self.assertTrue(aggregation._instrument_is_monotonic)
self.assertEqual(
aggregation._instrument_temporality, AggregationTemporality.DELTA
)

def test_up_down_counter(self):

aggregation = self.default_aggregation._create_aggregation(
UpDownCounter(Mock(), Mock(), Mock())
)
self.assertIsInstance(aggregation, _SumAggregation)
self.assertFalse(aggregation._instrument_is_monotonic)
self.assertEqual(
aggregation._instrument_temporality, AggregationTemporality.DELTA
)

def test_observable_counter(self):

aggregation = self.default_aggregation._create_aggregation(
ObservableCounter(Mock(), Mock(), Mock(), Mock())
)
self.assertIsInstance(aggregation, _SumAggregation)
self.assertTrue(aggregation._instrument_is_monotonic)
self.assertEqual(
aggregation._instrument_temporality,
AggregationTemporality.CUMULATIVE,
)

def test_observable_up_down_counter(self):

aggregation = self.default_aggregation._create_aggregation(
ObservableUpDownCounter(Mock(), Mock(), Mock(), Mock())
)
self.assertIsInstance(aggregation, _SumAggregation)
self.assertFalse(aggregation._instrument_is_monotonic)
self.assertEqual(
aggregation._instrument_temporality,
AggregationTemporality.CUMULATIVE,
)

def test_histogram(self):

aggregation = self.default_aggregation._create_aggregation(
Histogram(
Mock(),
Mock(),
Mock(),
Mock(),
Mock(),
Mock(),
)
)
self.assertIsInstance(aggregation, _ExplicitBucketHistogramAggregation)

def test_observable_gauge(self):

aggregation = self.default_aggregation._create_aggregation(
ObservableGauge(
Mock(),
Mock(),
Mock(),
Mock(),
)
)
self.assertIsInstance(aggregation, _LastValueAggregation)