Skip to content

Commit dc7899b

Browse files
authored
Add telemetry for dropped data due to exporter sending queue overflow (#3328)
* Add telemetry for dropped data due to exporter sending queue overflow This change adds internal metrics for dropped spans, metric points and log records when exporter sending queue is full: - exporter/enqueue_failed_metric_points - exporter/enqueue_failed_spans - exporter/enqueue_failed_log_records * Make report*EnqueueFailure methods private By moving them to the package where they are being used. It requires some code duplication
1 parent c8d72e9 commit dc7899b

File tree

13 files changed

+298
-26
lines changed

13 files changed

+298
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- Add `doc.go` files to the consumer package and its subpackages (#3270)
2121
- Automate triggering of doc-update on release (#3234)
2222
- Enable Dependabot for Github Actions (#3312)
23+
- Add telemetry for dropped data due to exporter sending queue overflow (#3328)
2324

2425
## v0.27.0 Beta
2526

exporter/exporterhelper/common.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import (
2323
"go.opentelemetry.io/collector/component"
2424
"go.opentelemetry.io/collector/component/componenthelper"
2525
"go.opentelemetry.io/collector/config"
26+
"go.opentelemetry.io/collector/config/configtelemetry"
2627
"go.opentelemetry.io/collector/consumer"
2728
"go.opentelemetry.io/collector/consumer/consumerhelper"
29+
"go.opentelemetry.io/collector/obsreport"
2830
)
2931

3032
// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend.
@@ -164,6 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
164166
// baseExporter contains common fields between different exporter types.
165167
type baseExporter struct {
166168
component.Component
169+
obsrep *obsExporter
167170
sender requestSender
168171
qrSender *queuedRetrySender
169172
}
@@ -173,6 +176,10 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings)
173176
Component: componenthelper.New(bs.componentOptions...),
174177
}
175178

179+
be.obsrep = newObsExporter(obsreport.ExporterSettings{
180+
Level: configtelemetry.GetMetricsLevelFlagValue(),
181+
ExporterID: cfg.ID(),
182+
})
176183
be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
177184
be.sender = be.qrSender
178185

exporter/exporterhelper/logs.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@ package exporterhelper
1616

1717
import (
1818
"context"
19+
"errors"
1920

2021
"go.uber.org/zap"
2122

2223
"go.opentelemetry.io/collector/component"
2324
"go.opentelemetry.io/collector/config"
24-
"go.opentelemetry.io/collector/config/configtelemetry"
2525
"go.opentelemetry.io/collector/consumer"
2626
"go.opentelemetry.io/collector/consumer/consumererror"
2727
"go.opentelemetry.io/collector/consumer/consumerhelper"
2828
"go.opentelemetry.io/collector/consumer/pdata"
29-
"go.opentelemetry.io/collector/obsreport"
3029
)
3130

3231
type logsRequest struct {
@@ -87,16 +86,18 @@ func NewLogsExporter(
8786
be := newBaseExporter(cfg, logger, bs)
8887
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
8988
return &logsExporterWithObservability{
90-
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
91-
Level: configtelemetry.GetMetricsLevelFlagValue(),
92-
ExporterID: cfg.ID(),
93-
}),
89+
obsrep: be.obsrep,
9490
nextSender: nextSender,
9591
}
9692
})
9793

9894
lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
99-
return be.sender.send(newLogsRequest(ctx, ld, pusher))
95+
req := newLogsRequest(ctx, ld, pusher)
96+
err := be.sender.send(req)
97+
if errors.Is(err, errSendingQueueIsFull) {
98+
be.obsrep.recordLogsEnqueueFailure(req.context(), req.count())
99+
}
100+
return err
100101
}, bs.consumerOptions...)
101102

102103
return &logsExporter{
@@ -106,7 +107,7 @@ func NewLogsExporter(
106107
}
107108

108109
type logsExporterWithObservability struct {
109-
obsrep *obsreport.Exporter
110+
obsrep *obsExporter
110111
nextSender requestSender
111112
}
112113

exporter/exporterhelper/logs_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,30 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) {
121121
checkRecordedMetricsForLogsExporter(t, le, want)
122122
}
123123

124+
func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
125+
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
126+
require.NoError(t, err)
127+
defer doneFn()
128+
129+
rCfg := DefaultRetrySettings()
130+
qCfg := DefaultQueueSettings()
131+
qCfg.NumConsumers = 1
132+
qCfg.QueueSize = 2
133+
wantErr := errors.New("some-error")
134+
te, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
135+
require.NoError(t, err)
136+
require.NotNil(t, te)
137+
138+
md := testdata.GenerateLogsTwoLogRecordsSameResourceOneDifferent()
139+
const numBatches = 7
140+
for i := 0; i < numBatches; i++ {
141+
te.ConsumeLogs(context.Background(), md)
142+
}
143+
144+
// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
145+
checkExporterEnqueueFailedLogsStats(t, fakeLogsExporterName, int64(15))
146+
}
147+
124148
func TestLogsExporter_WithSpan(t *testing.T) {
125149
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
126150
require.Nil(t, err)

exporter/exporterhelper/metrics.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@ package exporterhelper
1616

1717
import (
1818
"context"
19+
"errors"
1920

2021
"go.uber.org/zap"
2122

2223
"go.opentelemetry.io/collector/component"
2324
"go.opentelemetry.io/collector/config"
24-
"go.opentelemetry.io/collector/config/configtelemetry"
2525
"go.opentelemetry.io/collector/consumer"
2626
"go.opentelemetry.io/collector/consumer/consumererror"
2727
"go.opentelemetry.io/collector/consumer/consumerhelper"
2828
"go.opentelemetry.io/collector/consumer/pdata"
29-
"go.opentelemetry.io/collector/obsreport"
3029
)
3130

3231
type metricsRequest struct {
@@ -88,10 +87,7 @@ func NewMetricsExporter(
8887
be := newBaseExporter(cfg, logger, bs)
8988
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
9089
return &metricsSenderWithObservability{
91-
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
92-
Level: configtelemetry.GetMetricsLevelFlagValue(),
93-
ExporterID: cfg.ID(),
94-
}),
90+
obsrep: be.obsrep,
9591
nextSender: nextSender,
9692
}
9793
})
@@ -100,7 +96,12 @@ func NewMetricsExporter(
10096
if bs.ResourceToTelemetrySettings.Enabled {
10197
md = convertResourceToLabels(md)
10298
}
103-
return be.sender.send(newMetricsRequest(ctx, md, pusher))
99+
req := newMetricsRequest(ctx, md, pusher)
100+
err := be.sender.send(req)
101+
if errors.Is(err, errSendingQueueIsFull) {
102+
be.obsrep.recordMetricsEnqueueFailure(req.context(), req.count())
103+
}
104+
return err
104105
}, bs.consumerOptions...)
105106

106107
return &metricsExporter{
@@ -110,7 +111,7 @@ func NewMetricsExporter(
110111
}
111112

112113
type metricsSenderWithObservability struct {
113-
obsrep *obsreport.Exporter
114+
obsrep *obsExporter
114115
nextSender requestSender
115116
}
116117

exporter/exporterhelper/metrics_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,30 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
120120
checkRecordedMetricsForMetricsExporter(t, me, want)
121121
}
122122

123+
func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
124+
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
125+
require.NoError(t, err)
126+
defer doneFn()
127+
128+
rCfg := DefaultRetrySettings()
129+
qCfg := DefaultQueueSettings()
130+
qCfg.NumConsumers = 1
131+
qCfg.QueueSize = 2
132+
wantErr := errors.New("some-error")
133+
te, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
134+
require.NoError(t, err)
135+
require.NotNil(t, te)
136+
137+
md := testdata.GenerateMetricsOneMetricOneDataPoint()
138+
const numBatches = 7
139+
for i := 0; i < numBatches; i++ {
140+
te.ConsumeMetrics(context.Background(), md)
141+
}
142+
143+
// 2 batched must be in queue, and 5 metric points rejected due to queue overflow
144+
checkExporterEnqueueFailedMetricsStats(t, fakeMetricsExporterName, int64(5))
145+
}
146+
123147
func TestMetricsExporter_WithSpan(t *testing.T) {
124148
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
125149
require.NoError(t, err)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package exporterhelper
16+
17+
import (
18+
"context"
19+
20+
"go.opencensus.io/stats"
21+
"go.opencensus.io/tag"
22+
23+
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
24+
"go.opentelemetry.io/collector/obsreport"
25+
)
26+
27+
// TODO: Incorporate this functionality along with tests from obsreport_test.go
28+
// into existing `obsreport` package once its functionally is not exposed
29+
// as public API. For now this part is kept private.
30+
31+
// obsExporter is a helper to add observability to a component.Exporter.
32+
type obsExporter struct {
33+
*obsreport.Exporter
34+
mutators []tag.Mutator
35+
}
36+
37+
// newObsExporter creates a new observability exporter.
38+
func newObsExporter(cfg obsreport.ExporterSettings) *obsExporter {
39+
return &obsExporter{
40+
obsreport.NewExporter(cfg),
41+
[]tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
42+
}
43+
}
44+
45+
// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
46+
func (eor *obsExporter) recordTracesEnqueueFailure(ctx context.Context, numSpans int) {
47+
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
48+
}
49+
50+
// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
51+
func (eor *obsExporter) recordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
52+
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
53+
}
54+
55+
// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
56+
func (eor *obsExporter) recordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
57+
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
58+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package exporterhelper
16+
17+
import (
18+
"context"
19+
"reflect"
20+
"sort"
21+
"testing"
22+
23+
"github.com/stretchr/testify/require"
24+
"go.opencensus.io/stats/view"
25+
"go.opencensus.io/tag"
26+
27+
"go.opentelemetry.io/collector/config"
28+
"go.opentelemetry.io/collector/config/configtelemetry"
29+
"go.opentelemetry.io/collector/obsreport"
30+
"go.opentelemetry.io/collector/obsreport/obsreporttest"
31+
)
32+
33+
func TestExportEnqueueFailure(t *testing.T) {
34+
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
35+
require.NoError(t, err)
36+
defer doneFn()
37+
38+
exporter := config.NewID("fakeExporter")
39+
40+
obsrep := newObsExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
41+
42+
logRecords := 7
43+
obsrep.recordLogsEnqueueFailure(context.Background(), logRecords)
44+
checkExporterEnqueueFailedLogsStats(t, exporter, int64(logRecords))
45+
46+
spans := 12
47+
obsrep.recordTracesEnqueueFailure(context.Background(), spans)
48+
checkExporterEnqueueFailedTracesStats(t, exporter, int64(spans))
49+
50+
metricPoints := 21
51+
obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints)
52+
checkExporterEnqueueFailedMetricsStats(t, exporter, int64(metricPoints))
53+
}
54+
55+
// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values.
56+
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
57+
func checkExporterEnqueueFailedTracesStats(t *testing.T, exporter config.ComponentID, spans int64) {
58+
exporterTags := tagsForExporterView(exporter)
59+
checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans")
60+
}
61+
62+
// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values.
63+
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
64+
func checkExporterEnqueueFailedMetricsStats(t *testing.T, exporter config.ComponentID, metricPoints int64) {
65+
exporterTags := tagsForExporterView(exporter)
66+
checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points")
67+
}
68+
69+
// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values.
70+
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
71+
func checkExporterEnqueueFailedLogsStats(t *testing.T, exporter config.ComponentID, logRecords int64) {
72+
exporterTags := tagsForExporterView(exporter)
73+
checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records")
74+
}
75+
76+
// checkValueForView checks that for the current exported value in the view with the given name
77+
// for {LegacyTagKeyReceiver: receiverName} is equal to "value".
78+
func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
79+
// Make sure the tags slice is sorted by tag keys.
80+
sortTags(wantTags)
81+
82+
rows, err := view.RetrieveData(vName)
83+
require.NoError(t, err)
84+
85+
for _, row := range rows {
86+
// Make sure the tags slice is sorted by tag keys.
87+
sortTags(row.Tags)
88+
if reflect.DeepEqual(wantTags, row.Tags) {
89+
sum := row.Data.(*view.SumData)
90+
require.Equal(t, float64(value), sum.Value)
91+
return
92+
}
93+
}
94+
95+
require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows)
96+
}
97+
98+
// tagsForExporterView returns the tags that are needed for the exporter views.
99+
func tagsForExporterView(exporter config.ComponentID) []tag.Tag {
100+
return []tag.Tag{
101+
{Key: exporterTag, Value: exporter.String()},
102+
}
103+
}
104+
105+
func sortTags(tags []tag.Tag) {
106+
sort.SliceStable(tags, func(i, j int) bool {
107+
return tags[i].Key.Name() < tags[j].Key.Name()
108+
})
109+
}

exporter/exporterhelper/queued_retry.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ var (
4141
metric.WithDescription("Current size of the retry queue (in batches)"),
4242
metric.WithLabelKeys(obsmetrics.ExporterKey),
4343
metric.WithUnit(metricdata.UnitDimensionless))
44+
45+
errSendingQueueIsFull = errors.New("sending_queue is full")
4446
)
4547

4648
func init() {
@@ -189,7 +191,7 @@ func (qrs *queuedRetrySender) send(req request) error {
189191
zap.Int("dropped_items", req.count()),
190192
)
191193
span.Annotate(qrs.traceAttributes, "Dropped item, sending_queue is full.")
192-
return errors.New("sending_queue is full")
194+
return errSendingQueueIsFull
193195
}
194196

195197
span.Annotate(qrs.traceAttributes, "Enqueued item.")

0 commit comments

Comments
 (0)