Skip to content

Commit 3e138d7

Browse files
authored
Reimplement prometheus exporter (#2393)
* reimplement prometheus exporter * represent non monotonic sum as gauge * clean up * update factory * reorganize package * shorten values signature * add tests * more test coverage * more test coverage, fix sanitize bug * test collection of counters and gauges * simplify * add obsreport to exporter * migrate to latest pdata timestamp api * reflect PR feedback, introduce accumulator * unexport types, rename helper
1 parent 0515e03 commit 3e138d7

File tree

15 files changed

+1740
-121
lines changed

15 files changed

+1740
-121
lines changed

exporter/prometheusexporter/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The following settings can be optionally configured:
1616
- `namespace` (no default): if set, exports metrics under the provided value.
1717
- `send_timestamps` (default = `false`): if true, sends the timestamp of the underlying
1818
metric sample in the response.
19+
- `metric_expiration` (default = `5m`): defines how long metrics are exposed without updates
1920

2021
Example:
2122

@@ -28,4 +29,5 @@ exporters:
2829
label1: value1
2930
"another label": spaced value
3031
send_timestamps: true
32+
metric_expiration: 180m
3133
```
Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package prometheusexporter
16+
17+
import (
18+
"fmt"
19+
"strings"
20+
"sync"
21+
"time"
22+
23+
"go.uber.org/zap"
24+
25+
"go.opentelemetry.io/collector/consumer/pdata"
26+
)
27+
28+
type accumulatedValue struct {
29+
// value contains a metric with exactly one aggregated datapoint
30+
value pdata.Metric
31+
// stored indicates when metric was stored
32+
stored time.Time
33+
34+
instrumentationLibrary pdata.InstrumentationLibrary
35+
}
36+
37+
// accumulator stores aggragated values of incoming metrics
38+
type accumulator interface {
39+
// Accumulate stores aggragated metric values
40+
Accumulate(resourceMetrics pdata.ResourceMetrics) (processed int)
41+
// Collect returns a slice with relevant aggregated metrics
42+
Collect() (metrics []pdata.Metric)
43+
}
44+
45+
// LastValueAccumulator keeps last value for accumulated metrics
46+
type lastValueAccumulator struct {
47+
logger *zap.Logger
48+
49+
registeredMetrics sync.Map
50+
51+
// metricExpiration contains duration for which metric
52+
// should be served after it was stored
53+
metricExpiration time.Duration
54+
}
55+
56+
// NewAccumulator returns LastValueAccumulator
57+
func newAccumulator(logger *zap.Logger, metricExpiration time.Duration) accumulator {
58+
return &lastValueAccumulator{
59+
logger: logger,
60+
metricExpiration: metricExpiration,
61+
}
62+
}
63+
64+
// Accumulate stores one datapoint per metric
65+
func (a *lastValueAccumulator) Accumulate(rm pdata.ResourceMetrics) (n int) {
66+
ilms := rm.InstrumentationLibraryMetrics()
67+
68+
for i := 0; i < ilms.Len(); i++ {
69+
ilm := ilms.At(i)
70+
71+
metrics := ilm.Metrics()
72+
for j := 0; j < metrics.Len(); j++ {
73+
n += a.addMetric(metrics.At(j), ilm.InstrumentationLibrary())
74+
}
75+
}
76+
77+
return
78+
}
79+
80+
func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.InstrumentationLibrary) int {
81+
a.logger.Debug(fmt.Sprintf("accumulating metric: %s", metric.Name()))
82+
83+
switch metric.DataType() {
84+
case pdata.MetricDataTypeIntGauge:
85+
return a.accumulateIntGauge(metric, il)
86+
case pdata.MetricDataTypeIntSum:
87+
return a.accumulateIntSum(metric, il)
88+
case pdata.MetricDataTypeDoubleGauge:
89+
return a.accumulateDoubleGauge(metric, il)
90+
case pdata.MetricDataTypeDoubleSum:
91+
return a.accumulateDoubleSum(metric, il)
92+
case pdata.MetricDataTypeIntHistogram:
93+
return a.accumulateIntHistogram(metric, il)
94+
case pdata.MetricDataTypeDoubleHistogram:
95+
return a.accumulateDoubleHistogram(metric, il)
96+
}
97+
98+
return 0
99+
}
100+
101+
func (a *lastValueAccumulator) accumulateIntGauge(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
102+
dps := metric.IntGauge().DataPoints()
103+
for i := 0; i < dps.Len(); i++ {
104+
ip := dps.At(i)
105+
106+
ts := ip.Timestamp().AsTime()
107+
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
108+
109+
v, ok := a.registeredMetrics.Load(signature)
110+
if !ok {
111+
m := createMetric(metric)
112+
m.IntGauge().DataPoints().Append(ip)
113+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
114+
n++
115+
continue
116+
}
117+
mv := v.(*accumulatedValue)
118+
119+
if ts.Before(mv.value.IntGauge().DataPoints().At(0).Timestamp().AsTime()) {
120+
// only keep datapoint with latest timestamp
121+
continue
122+
}
123+
124+
m := createMetric(metric)
125+
m.IntGauge().DataPoints().Append(ip)
126+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
127+
n++
128+
}
129+
return
130+
}
131+
132+
func (a *lastValueAccumulator) accumulateDoubleGauge(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
133+
dps := metric.DoubleGauge().DataPoints()
134+
for i := 0; i < dps.Len(); i++ {
135+
ip := dps.At(i)
136+
137+
ts := ip.Timestamp().AsTime()
138+
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
139+
140+
v, ok := a.registeredMetrics.Load(signature)
141+
if !ok {
142+
m := createMetric(metric)
143+
m.DoubleGauge().DataPoints().Append(ip)
144+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
145+
n++
146+
continue
147+
}
148+
mv := v.(*accumulatedValue)
149+
150+
if ts.Before(mv.value.DoubleGauge().DataPoints().At(0).Timestamp().AsTime()) {
151+
// only keep datapoint with latest timestamp
152+
continue
153+
}
154+
155+
m := createMetric(metric)
156+
m.DoubleGauge().DataPoints().Append(ip)
157+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
158+
n++
159+
}
160+
return
161+
}
162+
163+
func (a *lastValueAccumulator) accumulateIntSum(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
164+
intSum := metric.IntSum()
165+
166+
// Drop metrics with non-cumulative aggregations
167+
if intSum.AggregationTemporality() != pdata.AggregationTemporalityCumulative {
168+
return
169+
}
170+
171+
dps := intSum.DataPoints()
172+
for i := 0; i < dps.Len(); i++ {
173+
ip := dps.At(i)
174+
175+
ts := ip.Timestamp().AsTime()
176+
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
177+
178+
v, ok := a.registeredMetrics.Load(signature)
179+
if !ok {
180+
m := createMetric(metric)
181+
m.IntSum().SetIsMonotonic(metric.IntSum().IsMonotonic())
182+
m.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
183+
m.IntSum().DataPoints().Append(ip)
184+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
185+
n++
186+
continue
187+
}
188+
mv := v.(*accumulatedValue)
189+
190+
if ts.Before(mv.value.IntSum().DataPoints().At(0).Timestamp().AsTime()) {
191+
// only keep datapoint with latest timestamp
192+
continue
193+
}
194+
195+
m := createMetric(metric)
196+
m.IntSum().SetIsMonotonic(metric.IntSum().IsMonotonic())
197+
m.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
198+
m.IntSum().DataPoints().Append(ip)
199+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
200+
n++
201+
}
202+
return
203+
}
204+
205+
func (a *lastValueAccumulator) accumulateDoubleSum(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
206+
doubleSum := metric.DoubleSum()
207+
208+
// Drop metrics with non-cumulative aggregations
209+
if doubleSum.AggregationTemporality() != pdata.AggregationTemporalityCumulative {
210+
return
211+
}
212+
213+
dps := doubleSum.DataPoints()
214+
for i := 0; i < dps.Len(); i++ {
215+
ip := dps.At(i)
216+
217+
ts := ip.Timestamp().AsTime()
218+
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
219+
220+
v, ok := a.registeredMetrics.Load(signature)
221+
if !ok {
222+
m := createMetric(metric)
223+
m.DoubleSum().SetIsMonotonic(metric.DoubleSum().IsMonotonic())
224+
m.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
225+
m.DoubleSum().DataPoints().Append(ip)
226+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
227+
n++
228+
continue
229+
}
230+
mv := v.(*accumulatedValue)
231+
232+
if ts.Before(mv.value.DoubleSum().DataPoints().At(0).Timestamp().AsTime()) {
233+
// only keep datapoint with latest timestamp
234+
continue
235+
}
236+
237+
m := createMetric(metric)
238+
m.DoubleSum().SetIsMonotonic(metric.DoubleSum().IsMonotonic())
239+
m.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
240+
m.DoubleSum().DataPoints().Append(ip)
241+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
242+
n++
243+
}
244+
return
245+
}
246+
247+
func (a *lastValueAccumulator) accumulateIntHistogram(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
248+
intHistogram := metric.IntHistogram()
249+
250+
// Drop metrics with non-cumulative aggregations
251+
if intHistogram.AggregationTemporality() != pdata.AggregationTemporalityCumulative {
252+
return
253+
}
254+
255+
dps := intHistogram.DataPoints()
256+
for i := 0; i < dps.Len(); i++ {
257+
ip := dps.At(i)
258+
259+
ts := ip.Timestamp().AsTime()
260+
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
261+
262+
v, ok := a.registeredMetrics.Load(signature)
263+
if !ok {
264+
m := createMetric(metric)
265+
m.IntHistogram().DataPoints().Append(ip)
266+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
267+
n++
268+
continue
269+
}
270+
mv := v.(*accumulatedValue)
271+
272+
if ts.Before(mv.value.IntHistogram().DataPoints().At(0).Timestamp().AsTime()) {
273+
// only keep datapoint with latest timestamp
274+
continue
275+
}
276+
277+
m := createMetric(metric)
278+
m.IntHistogram().DataPoints().Append(ip)
279+
m.IntHistogram().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
280+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
281+
n++
282+
}
283+
return
284+
}
285+
286+
func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
287+
doubleHistogram := metric.DoubleHistogram()
288+
289+
// Drop metrics with non-cumulative aggregations
290+
if doubleHistogram.AggregationTemporality() != pdata.AggregationTemporalityCumulative {
291+
return
292+
}
293+
294+
dps := doubleHistogram.DataPoints()
295+
for i := 0; i < dps.Len(); i++ {
296+
ip := dps.At(i)
297+
298+
ts := ip.Timestamp().AsTime()
299+
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
300+
301+
v, ok := a.registeredMetrics.Load(signature)
302+
if !ok {
303+
m := createMetric(metric)
304+
m.DoubleHistogram().DataPoints().Append(ip)
305+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
306+
n++
307+
continue
308+
}
309+
mv := v.(*accumulatedValue)
310+
311+
if ts.Before(mv.value.DoubleHistogram().DataPoints().At(0).Timestamp().AsTime()) {
312+
// only keep datapoint with latest timestamp
313+
continue
314+
}
315+
316+
m := createMetric(metric)
317+
m.DoubleHistogram().DataPoints().Append(ip)
318+
m.DoubleHistogram().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
319+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
320+
n++
321+
}
322+
return
323+
}
324+
325+
// Collect returns a slice with relevant aggregated metrics
326+
func (a *lastValueAccumulator) Collect() []pdata.Metric {
327+
a.logger.Debug("Accumulator collect called")
328+
329+
res := make([]pdata.Metric, 0)
330+
331+
a.registeredMetrics.Range(func(key, value interface{}) bool {
332+
v := value.(*accumulatedValue)
333+
if time.Now().After(v.stored.Add(a.metricExpiration)) {
334+
a.logger.Debug(fmt.Sprintf("metric expired: %s", v.value.Name()))
335+
a.registeredMetrics.Delete(key)
336+
return true
337+
}
338+
339+
res = append(res, v.value)
340+
return true
341+
})
342+
343+
return res
344+
}
345+
346+
func timeseriesSignature(ilmName string, metric pdata.Metric, labels pdata.StringMap) string {
347+
var b strings.Builder
348+
b.WriteString(metric.DataType().String())
349+
b.WriteString("*" + ilmName)
350+
b.WriteString("*" + metric.Name())
351+
labels.ForEach(func(k string, v string) {
352+
b.WriteString("*" + k + "*" + v)
353+
})
354+
return b.String()
355+
}
356+
357+
func createMetric(metric pdata.Metric) pdata.Metric {
358+
m := pdata.NewMetric()
359+
m.SetName(metric.Name())
360+
m.SetDescription(metric.Description())
361+
m.SetUnit(metric.Unit())
362+
m.SetDataType(metric.DataType())
363+
364+
return m
365+
}

0 commit comments

Comments
 (0)