Skip to content

Commit f57b6aa

Browse files
fix: Snapshot processor preserves metric aggregation temporality (PIPE-315) (#2541)
preserve metric temporality
1 parent 50cb9cb commit f57b6aa

11 files changed

Lines changed: 250 additions & 0 deletions

internal/report/snapshot/filter_metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ func filterGauge(g pmetric.Gauge, queryMatchesResource, queryMatchesName bool, s
139139
func filterSum(s pmetric.Sum, queryMatchesResource, queryMatchesName bool, searchQuery *string, minimumTimestamp *time.Time) pmetric.Sum {
140140
filteredSum := pmetric.NewSum()
141141

142+
// Copy aggregation temporality and monotonic flag from original sum
143+
filteredSum.SetAggregationTemporality(s.AggregationTemporality())
144+
filteredSum.SetIsMonotonic(s.IsMonotonic())
145+
142146
dps := s.DataPoints()
143147
for i := 0; i < dps.Len(); i++ {
144148
dp := dps.At(i)
@@ -153,6 +157,9 @@ func filterSum(s pmetric.Sum, queryMatchesResource, queryMatchesName bool, searc
153157
func filterHistogram(h pmetric.Histogram, queryMatchesResource, queryMatchesName bool, searchQuery *string, minimumTimestamp *time.Time) pmetric.Histogram {
154158
filteredHistogram := pmetric.NewHistogram()
155159

160+
// Copy aggregation temporality from original histogram
161+
filteredHistogram.SetAggregationTemporality(h.AggregationTemporality())
162+
156163
dps := h.DataPoints()
157164
for i := 0; i < dps.Len(); i++ {
158165
dp := dps.At(i)
@@ -167,6 +174,9 @@ func filterHistogram(h pmetric.Histogram, queryMatchesResource, queryMatchesName
167174
func filterExponentialHistogram(eh pmetric.ExponentialHistogram, queryMatchesResource, queryMatchesName bool, searchQuery *string, minimumTimestamp *time.Time) pmetric.ExponentialHistogram {
168175
filteredExponentialHistogram := pmetric.NewExponentialHistogram()
169176

177+
// Copy aggregation temporality from original exponential histogram
178+
filteredExponentialHistogram.SetAggregationTemporality(eh.AggregationTemporality())
179+
170180
dps := eh.DataPoints()
171181
for i := 0; i < dps.Len(); i++ {
172182
dp := dps.At(i)

internal/report/snapshot/testdata/metrics/after/filters-timestamp.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ resourceMetrics:
1212
- description: Filesystem bytes used.
1313
name: system.filesystem.usage
1414
sum:
15+
aggregationTemporality: 2
1516
dataPoints:
1617
- asInt: "220672"
1718
attributes:

internal/report/snapshot/testdata/metrics/after/matches-attr-key-exp-histogram.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ resourceMetrics:
33
scopeMetrics:
44
- metrics:
55
- exponentialHistogram:
6+
aggregationTemporality: 2
67
dataPoints:
78
- attributes:
89
- key: prod-machine

internal/report/snapshot/testdata/metrics/after/matches-attr-key-histogram.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ resourceMetrics:
33
scopeMetrics:
44
- metrics:
55
- histogram:
6+
aggregationTemporality: 2
67
dataPoints:
78
- attributes:
89
- key: prod-machine

internal/report/snapshot/testdata/metrics/after/matches-attr-key-sum.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ resourceMetrics:
1212
- description: Filesystem bytes used.
1313
name: system.filesystem.usage
1414
sum:
15+
aggregationTemporality: 2
1516
dataPoints:
1617
- asInt: "8717185024"
1718
attributes:

internal/report/snapshot/testdata/metrics/after/matches-attr-val-exp-histogram.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ resourceMetrics:
33
scopeMetrics:
44
- metrics:
55
- exponentialHistogram:
6+
aggregationTemporality: 2
67
dataPoints:
78
- attributes:
89
- key: dev-machine

internal/report/snapshot/testdata/metrics/after/matches-attr-val-histogram.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ resourceMetrics:
33
scopeMetrics:
44
- metrics:
55
- histogram:
6+
aggregationTemporality: 2
67
dataPoints:
78
- attributes:
89
- key: dev-machine

internal/report/snapshot/testdata/metrics/after/matches-attr-val-sum.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ resourceMetrics:
1212
- description: Filesystem bytes used.
1313
name: system.filesystem.usage
1414
sum:
15+
aggregationTemporality: 2
1516
dataPoints:
1617
- asInt: "8717185024"
1718
attributes:

processor/snapshotprocessor/processor_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,97 @@ func TestProcess_Traces(t *testing.T) {
228228
require.Equal(t, "reportSnapshot", mockOpamp.sentMessageType)
229229
}
230230

231+
func TestProcess_Metrics_PreservesTemporalityWithFiltering(t *testing.T) {
232+
factory := NewFactory()
233+
sink := &consumertest.MetricsSink{}
234+
235+
pSet := processortest.NewNopSettings(componentType)
236+
p, err := factory.CreateMetrics(context.Background(), pSet, factory.CreateDefaultConfig(), sink)
237+
require.NoError(t, err)
238+
239+
mockOpamp := &mockOpAMPExtension{
240+
msgChan: make(chan *protobufs.CustomMessage, 1),
241+
}
242+
243+
mockHost := &mockHost{
244+
extensions: map[component.ID]component.Component{
245+
component.MustNewID("opamp"): mockOpamp,
246+
},
247+
}
248+
249+
require.NoError(t, p.Start(context.Background(), mockHost))
250+
t.Cleanup(func() {
251+
require.NoError(t, p.Shutdown(context.Background()))
252+
})
253+
254+
// Load test metrics with different aggregation temporalities
255+
m, err := golden.ReadMetrics(filepath.Join("testdata", "metrics", "temporality-metrics.yaml"))
256+
require.NoError(t, err)
257+
258+
require.NoError(t, p.ConsumeMetrics(context.Background(), m))
259+
260+
// Request buffer with search query (this triggers the filtering code path where the bug occurred)
261+
reqPayload := fmt.Sprintf(`{"processor":%q,"pipeline_type":"metrics","session_id":"filtering-test","search_query":"transmit"}`, pSet.ID)
262+
263+
cm := &protobufs.CustomMessage{
264+
Capability: "com.bindplane.snapshot",
265+
Type: "requestSnapshot",
266+
Data: []byte(reqPayload),
267+
}
268+
269+
mockOpamp.msgChan <- cm
270+
271+
// Wait for response
272+
require.Eventually(t, func() bool {
273+
return mockOpamp.GotMessage()
274+
}, 5*time.Second, 100*time.Millisecond)
275+
276+
// Parse the actual response
277+
var actualMessageContents map[string]any
278+
err = json.Unmarshal(gunzipBytes(t, mockOpamp.sentMessage), &actualMessageContents)
279+
require.NoError(t, err)
280+
281+
// Verify filtering worked and only "transmit" metric is present
282+
telemetryPayload := actualMessageContents["telemetry_payload"].(map[string]any)
283+
resourceMetrics := telemetryPayload["resourceMetrics"].([]any)
284+
require.Len(t, resourceMetrics, 1, "Should have one resource metric after filtering")
285+
286+
firstResource := resourceMetrics[0].(map[string]any)
287+
scopeMetrics := firstResource["scopeMetrics"].([]any)
288+
require.Len(t, scopeMetrics, 1, "Should have one scope metric")
289+
290+
firstScope := scopeMetrics[0].(map[string]any)
291+
metrics := firstScope["metrics"].([]any)
292+
require.Len(t, metrics, 1, "Should have one metric matching 'transmit' filter")
293+
294+
// Verify the filtered metric is the correct one and has preserved aggregation temporality
295+
filteredMetric := metrics[0].(map[string]any)
296+
require.Equal(t, "system.network.io", filteredMetric["name"])
297+
298+
sum := filteredMetric["sum"].(map[string]any)
299+
require.Equal(t, float64(2), sum["aggregationTemporality"], "Aggregation temporality should be preserved as Cumulative (2) even after filtering")
300+
require.Equal(t, true, sum["isMonotonic"], "IsMonotonic should be preserved after filtering")
301+
302+
// Verify the data point attributes contain "transmit"
303+
dataPoints := sum["dataPoints"].([]any)
304+
require.Len(t, dataPoints, 1, "Should have one data point")
305+
306+
dataPoint := dataPoints[0].(map[string]any)
307+
attributes := dataPoint["attributes"].([]any)
308+
foundTransmit := false
309+
for _, attrAny := range attributes {
310+
attr := attrAny.(map[string]any)
311+
if attr["key"] == "direction" {
312+
value := attr["value"].(map[string]any)
313+
if value["stringValue"] == "transmit" {
314+
foundTransmit = true
315+
break
316+
}
317+
}
318+
}
319+
require.True(t, foundTransmit, "Filtered metric should contain 'transmit' attribute")
320+
}
321+
231322
// mockHost for component.Host
232323
type mockHost struct {
233324
extensions map[component.ID]component.Component
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
resourceMetrics:
2+
- resource:
3+
attributes:
4+
- key: host.name
5+
value:
6+
stringValue: test-host
7+
- key: service.name
8+
value:
9+
stringValue: temporality-test
10+
schemaUrl: https://opentelemetry.io/schemas/1.9.0
11+
scopeMetrics:
12+
- metrics:
13+
# Delta temporality metric
14+
- description: Request count with delta temporality.
15+
name: http.server.requests
16+
sum:
17+
aggregationTemporality: 1
18+
isMonotonic: true
19+
dataPoints:
20+
- asInt: "42"
21+
attributes:
22+
- key: method
23+
value:
24+
stringValue: GET
25+
- key: status_code
26+
value:
27+
stringValue: "200"
28+
startTimeUnixNano: "1000000"
29+
timeUnixNano: "2000000"
30+
# Cumulative temporality metric
31+
- description: Total bytes processed with cumulative temporality.
32+
name: system.network.io
33+
sum:
34+
aggregationTemporality: 2
35+
isMonotonic: true
36+
dataPoints:
37+
- asInt: "1024000"
38+
attributes:
39+
- key: direction
40+
value:
41+
stringValue: transmit
42+
- key: device
43+
value:
44+
stringValue: eth0
45+
startTimeUnixNano: "1000000"
46+
timeUnixNano: "2000000"
47+
# Gauge metric (no aggregation temporality)
48+
- description: Current memory usage.
49+
name: system.memory.usage
50+
gauge:
51+
dataPoints:
52+
- asDouble: 85.5
53+
attributes:
54+
- key: state
55+
value:
56+
stringValue: used
57+
startTimeUnixNano: "1000000"
58+
timeUnixNano: "2000000"
59+
scope:
60+
name: otelcol/test
61+
version: v1.0.0

0 commit comments

Comments
 (0)