Skip to content

Commit b17530e

Browse files
committed
WIP
1 parent 284072c commit b17530e

File tree

4 files changed

+62
-42
lines changed
  • exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter
  • instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics
  • opentelemetry-sdk/src/opentelemetry/sdk/metrics/export

4 files changed

+62
-42
lines changed

exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,33 +73,21 @@ def _get_data_points(
7373

7474
if isinstance(sdk_metric_record.aggregator, SumAggregator):
7575
value = sdk_metric_record.aggregator.checkpoint
76+
7677
elif isinstance(sdk_metric_record.aggregator, MinMaxSumCountAggregator):
7778
# FIXME: How are values to be interpreted from this aggregator?
7879
raise Exception("MinMaxSumCount aggregator data not supported")
80+
7981
elif isinstance(sdk_metric_record.aggregator, HistogramAggregator):
8082
# FIXME: How are values to be interpreted from this aggregator?
8183
raise Exception("Histogram aggregator data not supported")
84+
8285
elif isinstance(sdk_metric_record.aggregator, LastValueAggregator):
8386
value = sdk_metric_record.aggregator.checkpoint
87+
8488
elif isinstance(sdk_metric_record.aggregator, ValueObserverAggregator):
8589
value = sdk_metric_record.aggregator.checkpoint.last
8690

87-
print(
88-
"sdk_metric_record.aggregator.last_checkpoint_timestamp:\t{}".format(
89-
sdk_metric_record.aggregator.last_checkpoint_timestamp
90-
)
91-
)
92-
print(
93-
"sdk_metric_record.aggregator.last_update_timestamp:\t{}".format(
94-
sdk_metric_record.aggregator.last_update_timestamp
95-
)
96-
)
97-
if sdk_metric_record.aggregator.last_checkpoint_timestamp > sdk_metric_record.aggregator.last_update_timestamp:
98-
print("larger: sdk_metric_record.aggregator.last_checkpoint_timestamp")
99-
else:
100-
print("larger: sdk_metric_record.aggregator.last_update_timestamp")
101-
print()
102-
10391
return [
10492
data_point_class(
10593
labels=[
@@ -108,19 +96,11 @@ def _get_data_points(
10896
) for label_key, label_value in sdk_metric_record.labels
10997
],
11098
value=value,
111-
# start_time_unix_nano=1,
112-
# time_unix_nano=2,
113-
# start_time_unix_nano=(
114-
# sdk_metric_record.aggregator.last_checkpoint_timestamp
115-
# ),
116-
# time_unix_nano=(
117-
# sdk_metric_record.aggregator.last_update_timestamp
118-
# ),
11999
start_time_unix_nano=(
120-
sdk_metric_record.aggregator.last_update_timestamp
100+
sdk_metric_record.aggregator.initial_checkpoint_timestamp
121101
),
122102
time_unix_nano=(
123-
sdk_metric_record.aggregator.last_checkpoint_timestamp
103+
sdk_metric_record.aggregator.last_update_timestamp
124104
),
125105
)
126106
]

instrumentation/opentelemetry-instrumentation-system-metrics/src/opentelemetry/instrumentation/system_metrics/__init__.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ def __init__(
165165
self._runtime_gc_count_labels = self._labels.copy()
166166

167167
# ERROR: received cumulative expected gauge
168+
# Error: timestamps produce an error
169+
"""
168170
self.meter.register_observer(
169171
callback=self._get_system_cpu_time,
170172
name="system.cpu.time",
@@ -173,6 +175,7 @@ def __init__(
173175
value_type=float,
174176
observer_type=SumObserver,
175177
)
178+
"""
176179

177180
self.meter.register_observer(
178181
callback=self._get_system_cpu_utilization,
@@ -237,7 +240,6 @@ def __init__(
237240
# observer_type=SumObserver,
238241
# )
239242

240-
# Works fine
241243
self.meter.register_observer(
242244
callback=self._get_system_disk_io,
243245
name="system.disk.io",
@@ -247,7 +249,6 @@ def __init__(
247249
observer_type=SumObserver,
248250
)
249251

250-
# Works fine
251252
self.meter.register_observer(
252253
callback=self._get_system_disk_operations,
253254
name="system.disk.operations",
@@ -257,7 +258,6 @@ def __init__(
257258
observer_type=SumObserver,
258259
)
259260

260-
# Works fine
261261
self.meter.register_observer(
262262
callback=self._get_system_disk_time,
263263
name="system.disk.time",
@@ -267,7 +267,6 @@ def __init__(
267267
observer_type=SumObserver,
268268
)
269269

270-
# Seems to work fine, may cause an error when cancelled
271270
self.meter.register_observer(
272271
callback=self._get_system_disk_merged,
273272
name="system.disk.merged",
@@ -295,7 +294,6 @@ def __init__(
295294
# observer_type=ValueObserver,
296295
# )
297296

298-
# Works fine
299297
self.meter.register_observer(
300298
callback=self._get_system_network_dropped_packets,
301299
name="system.network.dropped_packets",
@@ -305,7 +303,6 @@ def __init__(
305303
observer_type=SumObserver,
306304
)
307305

308-
# Seems to work fine, may cause an error when cancelled
309306
self.meter.register_observer(
310307
callback=self._get_system_network_packets,
311308
name="system.network.packets",
@@ -315,7 +312,6 @@ def __init__(
315312
observer_type=SumObserver,
316313
)
317314

318-
# Works fine
319315
self.meter.register_observer(
320316
callback=self._get_system_network_errors,
321317
name="system.network.errors",
@@ -325,7 +321,6 @@ def __init__(
325321
observer_type=SumObserver,
326322
)
327323

328-
# Works fine
329324
self.meter.register_observer(
330325
callback=self._get_system_network_io,
331326
name="system.network.io",
@@ -335,7 +330,6 @@ def __init__(
335330
observer_type=SumObserver,
336331
)
337332

338-
# Seems to work fine, may cause an error when cancelled
339333
self.meter.register_observer(
340334
callback=self._get_system_network_connections,
341335
name="system.network.connections",
@@ -345,7 +339,6 @@ def __init__(
345339
observer_type=UpDownSumObserver,
346340
)
347341

348-
# Works fine
349342
self.meter.register_observer(
350343
callback=self._get_runtime_memory,
351344
name="runtime.{}.memory".format(self._python_implementation),
@@ -357,7 +350,6 @@ def __init__(
357350
observer_type=SumObserver,
358351
)
359352

360-
# Works fine
361353
self.meter.register_observer(
362354
callback=self._get_runtime_cpu_time,
363355
name="runtime.{}.cpu_time".format(self._python_implementation),
@@ -369,7 +361,6 @@ def __init__(
369361
observer_type=SumObserver,
370362
)
371363

372-
# Works fine
373364
self.meter.register_observer(
374365
callback=self._get_runtime_gc_count,
375366
name="runtime.{}.gc_count".format(self._python_implementation),

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class Aggregator(abc.ABC):
3333
def __init__(self, config=None):
3434
self._lock = threading.Lock()
3535
self.last_update_timestamp = 0
36-
self.last_checkpoint_timestamp = 0
36+
self.initial_checkpoint_timestamp = 0
37+
self.checkpointed = True
3738
if config is not None:
3839
self.config = config
3940
else:
@@ -42,21 +43,25 @@ def __init__(self, config=None):
4243
@abc.abstractmethod
4344
def update(self, value):
4445
"""Updates the current with the new value."""
46+
if self.checkpointed:
47+
self.initial_checkpoint_timestamp = time_ns()
48+
self.checkpointed = False
4549
self.last_update_timestamp = time_ns()
4650

4751
@abc.abstractmethod
4852
def take_checkpoint(self):
4953
"""Stores a snapshot of the current value."""
50-
self.last_checkpoint_timestamp = time_ns()
54+
self.checkpointed = True
5155

5256
@abc.abstractmethod
5357
def merge(self, other):
5458
"""Combines two aggregator values."""
5559
self.last_update_timestamp = max(
5660
self.last_update_timestamp, other.last_update_timestamp
5761
)
58-
self.last_checkpoint_timestamp = max(
59-
self.last_checkpoint_timestamp, other.last_checkpoint_timestamp
62+
self.initial_checkpoint_timestamp = max(
63+
self.initial_checkpoint_timestamp,
64+
other.initial_checkpoint_timestamp
6065
)
6166

6267
def _verify_type(self, other):

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import threading
16+
from time import sleep
1617

1718
from opentelemetry.context import attach, detach, set_value
1819
from opentelemetry.metrics import Meter
@@ -61,3 +62,46 @@ def tick(self):
6162
detach(token)
6263
# Perform post-exporting logic
6364
self.meter.processor.finished_collection()
65+
66+
67+
class DebugController:
68+
"""A debug controller, used to replace Push controller when debugging
69+
70+
Push controller uses a thread which makes it hard to use the IPython
71+
debugger. This controller does not use a thread, but relies on the user
72+
manually calling its ``run`` method to start the controller.
73+
74+
Args:
75+
meter: The meter used to collect metrics.
76+
exporter: The exporter used to export metrics.
77+
interval: The collect/export interval in seconds.
78+
"""
79+
80+
daemon = True
81+
82+
def __init__(
83+
self, meter: Meter, exporter: MetricsExporter, interval: float
84+
):
85+
super().__init__()
86+
self.meter = meter
87+
self.exporter = exporter
88+
self.interval = interval
89+
90+
def run(self):
91+
while True:
92+
self.tick()
93+
sleep(self.interval)
94+
95+
def shutdown(self):
96+
# Run one more collection pass to flush metrics batched in the meter
97+
self.tick()
98+
99+
def tick(self):
100+
# Collect all of the meter's metrics to be exported
101+
self.meter.collect()
102+
# Export the collected metrics
103+
token = attach(set_value("suppress_instrumentation", True))
104+
self.exporter.export(self.meter.processor.checkpoint_set())
105+
detach(token)
106+
# Perform post-exporting logic
107+
self.meter.processor.finished_collection()

0 commit comments

Comments
 (0)