Skip to content

Commit 78ca1b0

Browse files
committed
Make report*EnqueueFailure methods private
By moving them to the package where they are being used. It requires some code duplication
1 parent 69215b9 commit 78ca1b0

File tree

12 files changed

+177
-70
lines changed

12 files changed

+177
-70
lines changed

exporter/exporterhelper/common.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
166166
// baseExporter contains common fields between different exporter types.
167167
type baseExporter struct {
168168
component.Component
169-
obsrep *obsreport.Exporter
169+
obsrep *obsExporter
170170
sender requestSender
171171
qrSender *queuedRetrySender
172172
}
@@ -176,7 +176,7 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings)
176176
Component: componenthelper.New(bs.componentOptions...),
177177
}
178178

179-
be.obsrep = obsreport.NewExporter(obsreport.ExporterSettings{
179+
be.obsrep = newObsExporter(obsreport.ExporterSettings{
180180
Level: configtelemetry.GetMetricsLevelFlagValue(),
181181
ExporterID: cfg.ID(),
182182
})

exporter/exporterhelper/logs.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"go.opentelemetry.io/collector/consumer/consumererror"
2727
"go.opentelemetry.io/collector/consumer/consumerhelper"
2828
"go.opentelemetry.io/collector/consumer/pdata"
29-
"go.opentelemetry.io/collector/obsreport"
3029
)
3130

3231
type logsRequest struct {
@@ -96,7 +95,7 @@ func NewLogsExporter(
9695
req := newLogsRequest(ctx, ld, pusher)
9796
err := be.sender.send(req)
9897
if errors.Is(err, errSendingQueueIsFull) {
99-
be.obsrep.RecordLogsEnqueueFailure(req.context(), req.count())
98+
be.obsrep.recordLogsEnqueueFailure(req.context(), req.count())
10099
}
101100
return err
102101
}, bs.consumerOptions...)
@@ -108,7 +107,7 @@ func NewLogsExporter(
108107
}
109108

110109
type logsExporterWithObservability struct {
111-
obsrep *obsreport.Exporter
110+
obsrep *obsExporter
112111
nextSender requestSender
113112
}
114113

exporter/exporterhelper/logs_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
142142
}
143143

144144
// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
145-
obsreporttest.CheckExporterEnqueueFailedLogs(t, fakeLogsExporterName, int64(15))
145+
checkExporterEnqueueFailedLogsStats(t, fakeLogsExporterName, int64(15))
146146
}
147147

148148
func TestLogsExporter_WithSpan(t *testing.T) {

exporter/exporterhelper/metrics.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"go.opentelemetry.io/collector/consumer/consumererror"
2727
"go.opentelemetry.io/collector/consumer/consumerhelper"
2828
"go.opentelemetry.io/collector/consumer/pdata"
29-
"go.opentelemetry.io/collector/obsreport"
3029
)
3130

3231
type metricsRequest struct {
@@ -100,7 +99,7 @@ func NewMetricsExporter(
10099
req := newMetricsRequest(ctx, md, pusher)
101100
err := be.sender.send(req)
102101
if errors.Is(err, errSendingQueueIsFull) {
103-
be.obsrep.RecordMetricsEnqueueFailure(req.context(), req.count())
102+
be.obsrep.recordMetricsEnqueueFailure(req.context(), req.count())
104103
}
105104
return err
106105
}, bs.consumerOptions...)
@@ -112,7 +111,7 @@ func NewMetricsExporter(
112111
}
113112

114113
type metricsSenderWithObservability struct {
115-
obsrep *obsreport.Exporter
114+
obsrep *obsExporter
116115
nextSender requestSender
117116
}
118117

exporter/exporterhelper/metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
141141
}
142142

143143
// 2 batched must be in queue, and 5 metric points rejected due to queue overflow
144-
obsreporttest.CheckExporterEnqueueFailedMetrics(t, fakeMetricsExporterName, int64(5))
144+
checkExporterEnqueueFailedMetricsStats(t, fakeMetricsExporterName, int64(5))
145145
}
146146

147147
func TestMetricsExporter_WithSpan(t *testing.T) {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package exporterhelper
16+
17+
import (
18+
"context"
19+
20+
"go.opencensus.io/stats"
21+
"go.opencensus.io/tag"
22+
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
23+
"go.opentelemetry.io/collector/obsreport"
24+
)
25+
26+
// TODO: Incorporate this functionality along with tests from obsreport_test.go
27+
// into existing `obsreport` package once its functionally is not exposed
28+
// as public API. For now this part is kept private.
29+
30+
// obsExporter is a helper to add observability to a component.Exporter.
31+
type obsExporter struct {
32+
*obsreport.Exporter
33+
mutators []tag.Mutator
34+
}
35+
36+
// newObsExporter creates a new observability exporter.
37+
func newObsExporter(cfg obsreport.ExporterSettings) *obsExporter {
38+
return &obsExporter{
39+
obsreport.NewExporter(cfg),
40+
[]tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
41+
}
42+
}
43+
44+
// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
45+
func (eor *obsExporter) recordTracesEnqueueFailure(ctx context.Context, numSpans int) {
46+
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
47+
}
48+
49+
// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
50+
func (eor *obsExporter) recordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
51+
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
52+
}
53+
54+
// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
55+
func (eor *obsExporter) recordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
56+
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
57+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package exporterhelper
16+
17+
import (
18+
"context"
19+
"reflect"
20+
"sort"
21+
"testing"
22+
23+
"github.com/stretchr/testify/require"
24+
"go.opencensus.io/stats/view"
25+
"go.opencensus.io/tag"
26+
27+
"go.opentelemetry.io/collector/config"
28+
"go.opentelemetry.io/collector/config/configtelemetry"
29+
"go.opentelemetry.io/collector/obsreport"
30+
"go.opentelemetry.io/collector/obsreport/obsreporttest"
31+
)
32+
33+
func TestExportEnqueueFailure(t *testing.T) {
34+
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
35+
require.NoError(t, err)
36+
defer doneFn()
37+
38+
exporter := config.NewID("fakeExporter")
39+
40+
obsrep := newObsExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
41+
42+
logRecords := 7
43+
obsrep.recordLogsEnqueueFailure(context.Background(), logRecords)
44+
checkExporterEnqueueFailedLogsStats(t, exporter, int64(logRecords))
45+
46+
spans := 12
47+
obsrep.recordTracesEnqueueFailure(context.Background(), spans)
48+
checkExporterEnqueueFailedTracesStats(t, exporter, int64(spans))
49+
50+
metricPoints := 21
51+
obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints)
52+
checkExporterEnqueueFailedMetricsStats(t, exporter, int64(metricPoints))
53+
}
54+
55+
// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values.
56+
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
57+
func checkExporterEnqueueFailedTracesStats(t *testing.T, exporter config.ComponentID, spans int64) {
58+
exporterTags := tagsForExporterView(exporter)
59+
checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans")
60+
}
61+
62+
// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values.
63+
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
64+
func checkExporterEnqueueFailedMetricsStats(t *testing.T, exporter config.ComponentID, metricPoints int64) {
65+
exporterTags := tagsForExporterView(exporter)
66+
checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points")
67+
}
68+
69+
// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values.
70+
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
71+
func checkExporterEnqueueFailedLogsStats(t *testing.T, exporter config.ComponentID, logRecords int64) {
72+
exporterTags := tagsForExporterView(exporter)
73+
checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records")
74+
}
75+
76+
// checkValueForView checks that for the current exported value in the view with the given name
77+
// for {LegacyTagKeyReceiver: receiverName} is equal to "value".
78+
func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
79+
// Make sure the tags slice is sorted by tag keys.
80+
sortTags(wantTags)
81+
82+
rows, err := view.RetrieveData(vName)
83+
require.NoError(t, err)
84+
85+
for _, row := range rows {
86+
// Make sure the tags slice is sorted by tag keys.
87+
sortTags(row.Tags)
88+
if reflect.DeepEqual(wantTags, row.Tags) {
89+
sum := row.Data.(*view.SumData)
90+
require.Equal(t, float64(value), sum.Value)
91+
return
92+
}
93+
}
94+
95+
require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows)
96+
}
97+
98+
// tagsForExporterView returns the tags that are needed for the exporter views.
99+
func tagsForExporterView(exporter config.ComponentID) []tag.Tag {
100+
return []tag.Tag{
101+
{Key: exporterTag, Value: exporter.String()},
102+
}
103+
}
104+
105+
func sortTags(tags []tag.Tag) {
106+
sort.SliceStable(tags, func(i, j int) bool {
107+
return tags[i].Key.Name() < tags[j].Key.Name()
108+
})
109+
}

exporter/exporterhelper/traces.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"go.opentelemetry.io/collector/consumer/consumererror"
2727
"go.opentelemetry.io/collector/consumer/consumerhelper"
2828
"go.opentelemetry.io/collector/consumer/pdata"
29-
"go.opentelemetry.io/collector/obsreport"
3029
)
3130

3231
type tracesRequest struct {
@@ -97,7 +96,7 @@ func NewTracesExporter(
9796
req := newTracesRequest(ctx, td, pusher)
9897
err := be.sender.send(req)
9998
if errors.Is(err, errSendingQueueIsFull) {
100-
be.obsrep.RecordTracesEnqueueFailure(req.context(), req.count())
99+
be.obsrep.recordTracesEnqueueFailure(req.context(), req.count())
101100
}
102101
return err
103102
}, bs.consumerOptions...)
@@ -109,7 +108,7 @@ func NewTracesExporter(
109108
}
110109

111110
type tracesExporterWithObservability struct {
112-
obsrep *obsreport.Exporter
111+
obsrep *obsExporter
113112
nextSender requestSender
114113
}
115114

exporter/exporterhelper/traces_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
152152
}
153153

154154
// 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow
155-
obsreporttest.CheckExporterEnqueueFailedTraces(t, fakeTracesExporterName, int64(10))
155+
checkExporterEnqueueFailedTracesStats(t, fakeTracesExporterName, int64(10))
156156
}
157157

158158
func TestTracesExporter_WithSpan(t *testing.T) {

obsreport/obsreport_exporter.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,6 @@ func (eor *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) {
6363
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey)
6464
}
6565

66-
// RecordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
67-
func (eor *Exporter) RecordTracesEnqueueFailure(ctx context.Context, numSpans int) {
68-
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
69-
}
70-
7166
// StartMetricsOp is called at the start of an Export operation.
7267
// The returned context should be used in other calls to the Exporter functions
7368
// dealing with the same export operation.
@@ -83,11 +78,6 @@ func (eor *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err
8378
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey)
8479
}
8580

86-
// RecordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
87-
func (eor *Exporter) RecordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
88-
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
89-
}
90-
9181
// StartLogsOp is called at the start of an Export operation.
9282
// The returned context should be used in other calls to the Exporter functions
9383
// dealing with the same export operation.
@@ -102,11 +92,6 @@ func (eor *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error
10292
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey)
10393
}
10494

105-
// RecordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
106-
func (eor *Exporter) RecordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
107-
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
108-
}
109-
11095
// startSpan creates the span used to trace the operation. Returning
11196
// the updated context and the created span.
11297
func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context {

0 commit comments

Comments
 (0)