forked from open-telemetry/opentelemetry-collector
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobsreport_receiver.go
More file actions
370 lines (334 loc) · 11.3 KB
/
obsreport_receiver.go
File metadata and controls
370 lines (334 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package obsreport
import (
"context"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
)
const (
// ReceiverKey used to identify receivers in metrics and traces.
ReceiverKey = "receiver"
// TransportKey used to identify the transport used to received the data.
TransportKey = "transport"
// FormatKey used to identify the format of the data received.
FormatKey = "format"
// AcceptedSpansKey used to identify spans accepted by the Collector.
AcceptedSpansKey = "accepted_spans"
// RefusedSpansKey used to identify spans refused (ie.: not ingested) by the Collector.
RefusedSpansKey = "refused_spans"
// AcceptedMetricPointsKey used to identify metric points accepted by the Collector.
AcceptedMetricPointsKey = "accepted_metric_points"
// RefusedMetricPointsKey used to identify metric points refused (ie.: not ingested) by the
// Collector.
RefusedMetricPointsKey = "refused_metric_points"
// AcceptedLogRecordsKey used to identify log records accepted by the Collector.
AcceptedLogRecordsKey = "accepted_log_records"
// RefusedLogRecordsKey used to identify log records refused (ie.: not ingested) by the
// Collector.
RefusedLogRecordsKey = "refused_log_records"
)
var (
tagKeyReceiver, _ = tag.NewKey(ReceiverKey)
tagKeyTransport, _ = tag.NewKey(TransportKey)
receiverPrefix = ReceiverKey + nameSep
receiveTraceDataOperationSuffix = nameSep + "TraceDataReceived"
receiverMetricsOperationSuffix = nameSep + "MetricsReceived"
receiverLogsOperationSuffix = nameSep + "LogsReceived"
// Receiver metrics. Any count of data items below is in the original format
// that they were received, reasoning: reconciliation is easier if measurements
// on clients and receiver are expected to be the same. Translation issues
// that result in a different number of elements should be reported in a
// separate way.
mReceiverAcceptedSpans = stats.Int64(
receiverPrefix+AcceptedSpansKey,
"Number of spans successfully pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverRefusedSpans = stats.Int64(
receiverPrefix+RefusedSpansKey,
"Number of spans that could not be pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverAcceptedMetricPoints = stats.Int64(
receiverPrefix+AcceptedMetricPointsKey,
"Number of metric points successfully pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverRefusedMetricPoints = stats.Int64(
receiverPrefix+RefusedMetricPointsKey,
"Number of metric points that could not be pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverAcceptedLogRecords = stats.Int64(
receiverPrefix+AcceptedLogRecordsKey,
"Number of log records successfully pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverRefusedLogRecords = stats.Int64(
receiverPrefix+RefusedLogRecordsKey,
"Number of log records that could not be pushed into the pipeline.",
stats.UnitDimensionless)
)
// StartReceiveOptions has the options related to starting a receive operation.
type StartReceiveOptions struct {
// LongLivedCtx when true indicates that the context passed in the call
// outlives the individual receive operation. See WithLongLivedCtx() for
// more information.
LongLivedCtx bool
}
// StartReceiveOption function applues changes to StartReceiveOptions.
type StartReceiveOption func(*StartReceiveOptions)
// WithLongLivedCtx indicates that the context passed in the call outlives the
// receive operation at hand. Typically the long lived context is associated
// to a connection, eg.: a gRPC stream or a TCP connection, for which many
// batches of data are received in individual operations without a corresponding
// new context per operation.
//
// Example:
//
// func (r *receiver) ClientConnect(ctx context.Context, rcvChan <-chan pdata.Traces) {
// longLivedCtx := obsreport.ReceiverContext(ctx, r.config.Name(), r.transport, "")
// for {
// // Since the context outlives the individual receive operations call obsreport using
// // WithLongLivedCtx().
// ctx := obsreport.StartTraceDataReceiveOp(
// longLivedCtx,
// r.config.Name(),
// r.transport,
// obsreport.WithLongLivedCtx())
//
// td, ok := <-rcvChan
// var err error
// if ok {
// err = r.nextConsumer.ConsumeTraces(ctx, td)
// }
// obsreport.EndTraceDataReceiveOp(
// ctx,
// r.format,
// len(td.Spans),
// err)
// if !ok {
// break
// }
// }
// }
//
func WithLongLivedCtx() StartReceiveOption {
return func(opts *StartReceiveOptions) {
opts.LongLivedCtx = true
}
}
// Receiver is a helper to add obersvability to a component.Receiver.
type Receiver struct {
receiverID config.ComponentID
transport string
}
// ReceiverSettings are settings for creating an Receiver.
type ReceiverSettings struct {
ReceiverID config.ComponentID
Transport string
}
// NewReceiver creates a new Receiver.
func NewReceiver(cfg ReceiverSettings) *Receiver {
return &Receiver{
receiverID: cfg.ReceiverID,
transport: cfg.Transport,
}
}
// StartTraceDataReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartTraceDataReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiveTraceDataOperationSuffix,
opt...)
}
// EndTraceDataReceiveOp completes the receive operation that was started with
// StartTraceDataReceiveOp.
func (rec *Receiver) EndTraceDataReceiveOp(
receiverCtx context.Context,
format string,
numReceivedSpans int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedSpans,
err,
config.TracesDataType,
)
}
// StartLogsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartLogsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiverLogsOperationSuffix,
opt...)
}
// EndLogsReceiveOp completes the receive operation that was started with
// StartLogsReceiveOp.
func (rec *Receiver) EndLogsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
err,
config.LogsDataType,
)
}
// StartMetricsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartMetricsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiverMetricsOperationSuffix,
opt...)
}
// EndMetricsReceiveOp completes the receive operation that was started with
// StartMetricsReceiveOp.
func (rec *Receiver) EndMetricsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedPoints int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedPoints,
err,
config.MetricsDataType,
)
}
// ReceiverContext adds the keys used when recording observability metrics to
// the given context returning the newly created context. This context should
// be used in related calls to the obsreport functions so metrics are properly
// recorded.
func ReceiverContext(
ctx context.Context,
receiverID config.ComponentID,
transport string,
) context.Context {
ctx, _ = tag.New(ctx,
tag.Upsert(tagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(tagKeyTransport, transport, tag.WithTTL(tag.TTLNoPropagation)))
return ctx
}
// traceReceiveOp creates the span used to trace the operation. Returning
// the updated context with the created span.
func (rec *Receiver) traceReceiveOp(
receiverCtx context.Context,
operationSuffix string,
opt ...StartReceiveOption,
) context.Context {
var opts StartReceiveOptions
for _, o := range opt {
o(&opts)
}
var ctx context.Context
var span *trace.Span
spanName := receiverPrefix + rec.receiverID.String() + operationSuffix
if !opts.LongLivedCtx {
ctx, span = trace.StartSpan(receiverCtx, spanName)
} else {
// Since the receiverCtx is long lived do not use it to start the span.
// This way this trace ends when the EndTraceDataReceiveOp is called.
// Here is safe to ignore the returned context since it is not used below.
_, span = trace.StartSpan(context.Background(), spanName)
// If the long lived context has a parent span, then add it as a parent link.
setParentLink(receiverCtx, span)
ctx = trace.NewContext(receiverCtx, span)
}
if rec.transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, rec.transport))
}
return ctx
}
// endReceiveOp records the observability signals at the end of an operation.
func (rec *Receiver) endReceiveOp(
receiverCtx context.Context,
format string,
numReceivedItems int,
err error,
dataType config.DataType,
) {
numAccepted := numReceivedItems
numRefused := 0
if err != nil {
numAccepted = 0
numRefused = numReceivedItems
}
span := trace.FromContext(receiverCtx)
if gLevel != configtelemetry.LevelNone {
var acceptedMeasure, refusedMeasure *stats.Int64Measure
switch dataType {
case config.TracesDataType:
acceptedMeasure = mReceiverAcceptedSpans
refusedMeasure = mReceiverRefusedSpans
case config.MetricsDataType:
acceptedMeasure = mReceiverAcceptedMetricPoints
refusedMeasure = mReceiverRefusedMetricPoints
case config.LogsDataType:
acceptedMeasure = mReceiverAcceptedLogRecords
refusedMeasure = mReceiverRefusedLogRecords
}
stats.Record(
receiverCtx,
acceptedMeasure.M(int64(numAccepted)),
refusedMeasure.M(int64(numRefused)))
}
// end span according to errors
if span.IsRecordingEvents() {
var acceptedItemsKey, refusedItemsKey string
switch dataType {
case config.TracesDataType:
acceptedItemsKey = AcceptedSpansKey
refusedItemsKey = RefusedSpansKey
case config.MetricsDataType:
acceptedItemsKey = AcceptedMetricPointsKey
refusedItemsKey = RefusedMetricPointsKey
case config.LogsDataType:
acceptedItemsKey = AcceptedLogRecordsKey
refusedItemsKey = RefusedLogRecordsKey
}
span.AddAttributes(
trace.StringAttribute(
FormatKey, format),
trace.Int64Attribute(
acceptedItemsKey, int64(numAccepted)),
trace.Int64Attribute(
refusedItemsKey, int64(numRefused)),
)
span.SetStatus(errToStatus(err))
}
span.End()
}