forked from open-telemetry/opentelemetry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinstrument.py
More file actions
142 lines (117 loc) · 4.46 KB
/
instrument.py
File metadata and controls
142 lines (117 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-ancestors
import logging
from typing import Callable, Dict, Generator, Iterable, Union
from opentelemetry._metrics.instrument import CallbackT
from opentelemetry._metrics.instrument import Counter as APICounter
from opentelemetry._metrics.instrument import Histogram as APIHistogram
from opentelemetry._metrics.instrument import (
ObservableCounter as APIObservableCounter,
)
from opentelemetry._metrics.instrument import (
ObservableGauge as APIObservableGauge,
)
from opentelemetry._metrics.instrument import (
ObservableUpDownCounter as APIObservableUpDownCounter,
)
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry._metrics.measurement import Measurement as APIMeasurement
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
_logger = logging.getLogger(__name__)
class _Synchronous:
def __init__(
self,
name: str,
instrumentation_info: InstrumentationInfo,
measurement_consumer: MeasurementConsumer,
unit: str = "",
description: str = "",
):
self.name = name
self.unit = unit
self.description = description
self.instrumentation_info = instrumentation_info
self._measurement_consumer = measurement_consumer
super().__init__(name, unit=unit, description=description)
class _Asynchronous:
def __init__(
self,
name: str,
instrumentation_info: InstrumentationInfo,
measurement_consumer: MeasurementConsumer,
callback: CallbackT,
unit: str = "",
description: str = "",
):
self.name = name
self.unit = unit
self.description = description
self.instrumentation_info = instrumentation_info
self._measurement_consumer = measurement_consumer
super().__init__(name, callback, unit=unit, description=description)
self._callback: Callable[[], Iterable[APIMeasurement]]
if isinstance(callback, Generator):
def inner() -> Iterable[Measurement]:
return next(callback)
self._callback = inner
else:
self._callback = callback
def callback(self) -> Iterable[Measurement]:
for api_measurement in self._callback():
yield Measurement(
api_measurement.value,
instrument=self,
attributes=api_measurement.attributes,
)
class Counter(_Synchronous, APICounter):
def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
if amount < 0:
_logger.warning(
"Add amount must be non-negative on Counter %s.", self.name
)
return
self._measurement_consumer.consume_measurement(
Measurement(amount, self, attributes)
)
class UpDownCounter(_Synchronous, APIUpDownCounter):
def add(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
self._measurement_consumer.consume_measurement(
Measurement(amount, self, attributes)
)
class ObservableCounter(_Asynchronous, APIObservableCounter):
pass
class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter):
pass
class Histogram(_Synchronous, APIHistogram):
def record(
self, amount: Union[int, float], attributes: Dict[str, str] = None
):
if amount < 0:
_logger.warning(
"Record amount must be non-negative on Histogram %s.",
self.name,
)
return
self._measurement_consumer.consume_measurement(
Measurement(amount, self, attributes)
)
class ObservableGauge(_Asynchronous, APIObservableGauge):
pass