Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Add an internal sharedcomponent to be shared by receivers with shared resources (#3198)
- Allow users to configure the Prometheus remote write queue (#3046)
- Mark internaldata traces translation as deprecated for external usage (#3176)
- Change receiver obsreport helpers pattern to match the Processor/Exporter (#3227)

## 🧰 Bug fixes 🧰

Expand Down
68 changes: 37 additions & 31 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,32 +143,48 @@ func WithLongLivedCtx() StartReceiveOption {
}
}

// Receiver is a helper to add obersvability to a component.Receiver.
type Receiver struct {
receiverID config.ComponentID
transport string
}

// ReceiverSettings are settings for creating an Receiver.
type ReceiverSettings struct {
ReceiverID config.ComponentID
Transport string
}

// NewReceiver creates a new Receiver.
func NewReceiver(cfg ReceiverSettings) *Receiver {
return &Receiver{
receiverID: cfg.ReceiverID,
transport: cfg.Transport,
}
}

// StartTraceDataReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartTraceDataReceiveOp(
func (rec *Receiver) StartTraceDataReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiveTraceDataOperationSuffix,
opt...)
}

// EndTraceDataReceiveOp completes the receive operation that was started with
// StartTraceDataReceiveOp.
func EndTraceDataReceiveOp(
func (rec *Receiver) EndTraceDataReceiveOp(
receiverCtx context.Context,
format string,
numReceivedSpans int,
err error,
) {
endReceiveOp(
rec.endReceiveOp(
receiverCtx,
format,
numReceivedSpans,
Expand All @@ -180,29 +196,25 @@ func EndTraceDataReceiveOp(
// StartLogsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartLogsReceiveOp(
func (rec *Receiver) StartLogsReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiverLogsOperationSuffix,
opt...)
}

// EndLogsReceiveOp completes the receive operation that was started with
// StartLogsReceiveOp.
func EndLogsReceiveOp(
func (rec *Receiver) EndLogsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
endReceiveOp(
rec.endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
Expand All @@ -214,29 +226,25 @@ func EndLogsReceiveOp(
// StartMetricsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartMetricsReceiveOp(
func (rec *Receiver) StartMetricsReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiverMetricsOperationSuffix,
opt...)
}

// EndMetricsReceiveOp completes the receive operation that was started with
// StartMetricsReceiveOp.
func EndMetricsReceiveOp(
func (rec *Receiver) EndMetricsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedPoints int,
err error,
) {
endReceiveOp(
rec.endReceiveOp(
receiverCtx,
format,
numReceivedPoints,
Expand All @@ -263,10 +271,8 @@ func ReceiverContext(

// traceReceiveOp creates the span used to trace the operation. Returning
// the updated context with the created span.
func traceReceiveOp(
func (rec *Receiver) traceReceiveOp(
receiverCtx context.Context,
receiverID config.ComponentID,
transport string,
operationSuffix string,
opt ...StartReceiveOption,
) context.Context {
Expand All @@ -277,7 +283,7 @@ func traceReceiveOp(

var ctx context.Context
var span *trace.Span
spanName := receiverPrefix + receiverID.String() + operationSuffix
spanName := receiverPrefix + rec.receiverID.String() + operationSuffix
if !opts.LongLivedCtx {
ctx, span = trace.StartSpan(receiverCtx, spanName)
} else {
Expand All @@ -292,14 +298,14 @@ func traceReceiveOp(
ctx = trace.NewContext(receiverCtx, span)
}

if transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, transport))
if rec.transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, rec.transport))
}
return ctx
}

// endReceiveOp records the observability signals at the end of an operation.
func endReceiveOp(
func (rec *Receiver) endReceiveOp(
receiverCtx context.Context,
format string,
numReceivedItems int,
Expand Down
22 changes: 12 additions & 10 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func TestReceiveTraceDataOp(t *testing.T) {
}
rcvdSpans := []int{13, 42}
for i, param := range params {
ctx := obsreport.StartTraceDataReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
rcvdSpans[i],
Expand Down Expand Up @@ -169,10 +170,11 @@ func TestReceiveLogsOp(t *testing.T) {
}
rcvdLogRecords := []int{13, 42}
for i, param := range params {
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndLogsReceiveOp(
rec.EndLogsReceiveOp(
ctx,
format,
rcvdLogRecords[i],
Expand Down Expand Up @@ -229,10 +231,11 @@ func TestReceiveMetricsOp(t *testing.T) {
}
rcvdMetricPts := []int{23, 29}
for i, param := range params {
ctx := obsreport.StartMetricsReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndMetricsReceiveOp(
rec.EndMetricsReceiveOp(
ctx,
format,
rcvdMetricPts[i],
Expand Down Expand Up @@ -499,14 +502,13 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
for _, op := range ops {
// Use a new context on each operation to simulate distinct operations
// under the same long lived context.
ctx := obsreport.StartTraceDataReceiveOp(
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(
longLivedCtx,
receiver,
transport,
obsreport.WithLongLivedCtx())
assert.NotNil(t, ctx)

obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
op.numSpans,
Expand Down
15 changes: 9 additions & 6 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func TestCheckReceiverTracesViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartTraceDataReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
7,
Expand All @@ -60,9 +61,10 @@ func TestCheckReceiverMetricsViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartMetricsReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndMetricsReceiveOp(ctx, format, 7, nil)
rec.EndMetricsReceiveOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverMetrics(t, receiver, transport, 7, 0)
}
Expand All @@ -73,9 +75,10 @@ func TestCheckReceiverLogsViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndLogsReceiveOp(ctx, format, 7, nil)
rec.EndLogsReceiveOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverLogs(t, receiver, transport, 7, 0)
}
Expand Down
20 changes: 13 additions & 7 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type jReceiver struct {
goroutines sync.WaitGroup

logger *zap.Logger

receiver *obsreport.Receiver
}

const (
Expand Down Expand Up @@ -228,6 +230,7 @@ type agentHandler struct {
id config.ComponentID
transport string
nextConsumer consumer.Traces
receiver *obsreport.Receiver
}

// EmitZipkinBatch is unsupported agent's
Expand All @@ -239,10 +242,11 @@ func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err
// Jaeger spans received by the Jaeger agent processor.
func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
ctx = obsreport.ReceiverContext(ctx, h.id, h.transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, h.id, h.transport)
h.receiver = obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: h.id, Transport: h.transport})
ctx = h.receiver.StartTraceDataReceiveOp(ctx)

numSpans, err := consumeTraces(ctx, batch, h.nextConsumer)
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
h.receiver.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
return err
}

Expand All @@ -267,12 +271,13 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}

ctx = obsreport.ReceiverContext(ctx, jr.id, grpcTransport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, jr.id, grpcTransport)
jr.receiver = obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: grpcTransport})
ctx = jr.receiver.StartTraceDataReceiveOp(ctx)

td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch())

err := jr.nextConsumer.ConsumeTraces(ctx, td)
obsreport.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
jr.receiver.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -415,12 +420,13 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
}

ctx = obsreport.ReceiverContext(ctx, jr.id, collectorHTTPTransport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, jr.id, collectorHTTPTransport)
jr.receiver = obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: collectorHTTPTransport})
ctx = jr.receiver.StartTraceDataReceiveOp(ctx)

batch, hErr := jr.decodeThriftHTTPBody(r)
if hErr != nil {
http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode)
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, 0, hErr)
jr.receiver.EndTraceDataReceiveOp(ctx, thriftFormat, 0, hErr)
return
}

Expand All @@ -430,7 +436,7 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
} else {
w.WriteHeader(http.StatusAccepted)
}
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
jr.receiver.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
}

func (jr *jReceiver) startCollector(host component.Host) error {
Expand Down
14 changes: 10 additions & 4 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ type tracesConsumerGroupHandler struct {
readyCloser sync.Once

logger *zap.Logger

receiver *obsreport.Receiver
}

type logsConsumerGroupHandler struct {
Expand All @@ -224,6 +226,8 @@ type logsConsumerGroupHandler struct {
readyCloser sync.Once

logger *zap.Logger

receiver *obsreport.Receiver
}

var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil)
Expand Down Expand Up @@ -254,7 +258,8 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, c.id, transport)
c.receiver = obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: c.id, Transport: transport})
ctx = c.receiver.StartTraceDataReceiveOp(ctx)
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
Expand All @@ -269,7 +274,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe

spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
c.receiver.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
return err
}
Expand Down Expand Up @@ -306,7 +311,8 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, c.id, transport)
c.receiver = obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: c.id, Transport: transport})
ctx = c.receiver.StartTraceDataReceiveOp(ctx)
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Insert(tagInstanceName, c.id.String())},
Expand All @@ -322,7 +328,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess

err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
c.receiver.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
if err != nil {
return err
}
Expand Down
Loading