Skip to content
Merged
68 changes: 37 additions & 31 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,32 +143,48 @@ func WithLongLivedCtx() StartReceiveOption {
}
}

// Receiver is a helper to add obersvability to a component.Receiver.
type Receiver struct {
receiverID config.ComponentID
transport string
}

// ReceiverSettings are settings for creating an Receiver.
type ReceiverSettings struct {
ReceiverID config.ComponentID
Transport string
}

// NewReceiver creates a new Receiver.
func NewReceiver(cfg ReceiverSettings) *Receiver {
return &Receiver{
receiverID: cfg.ReceiverID,
transport: cfg.Transport,
}
}

// StartTraceDataReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartTraceDataReceiveOp(
func (rec *Receiver) StartTraceDataReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiveTraceDataOperationSuffix,
opt...)
}

// EndTraceDataReceiveOp completes the receive operation that was started with
// StartTraceDataReceiveOp.
func EndTraceDataReceiveOp(
func (rec *Receiver) EndTraceDataReceiveOp(
receiverCtx context.Context,
format string,
numReceivedSpans int,
err error,
) {
endReceiveOp(
rec.endReceiveOp(
receiverCtx,
format,
numReceivedSpans,
Expand All @@ -180,29 +196,25 @@ func EndTraceDataReceiveOp(
// StartLogsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartLogsReceiveOp(
func (rec *Receiver) StartLogsReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiverLogsOperationSuffix,
opt...)
}

// EndLogsReceiveOp completes the receive operation that was started with
// StartLogsReceiveOp.
func EndLogsReceiveOp(
func (rec *Receiver) EndLogsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
endReceiveOp(
rec.endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
Expand All @@ -214,29 +226,25 @@ func EndLogsReceiveOp(
// StartMetricsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartMetricsReceiveOp(
func (rec *Receiver) StartMetricsReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiverMetricsOperationSuffix,
opt...)
}

// EndMetricsReceiveOp completes the receive operation that was started with
// StartMetricsReceiveOp.
func EndMetricsReceiveOp(
func (rec *Receiver) EndMetricsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedPoints int,
err error,
) {
endReceiveOp(
rec.endReceiveOp(
receiverCtx,
format,
numReceivedPoints,
Expand All @@ -263,10 +271,8 @@ func ReceiverContext(

// traceReceiveOp creates the span used to trace the operation. Returning
// the updated context with the created span.
func traceReceiveOp(
func (rec *Receiver) traceReceiveOp(
receiverCtx context.Context,
receiverID config.ComponentID,
transport string,
operationSuffix string,
opt ...StartReceiveOption,
) context.Context {
Expand All @@ -277,7 +283,7 @@ func traceReceiveOp(

var ctx context.Context
var span *trace.Span
spanName := receiverPrefix + receiverID.String() + operationSuffix
spanName := receiverPrefix + rec.receiverID.String() + operationSuffix
if !opts.LongLivedCtx {
ctx, span = trace.StartSpan(receiverCtx, spanName)
} else {
Expand All @@ -292,14 +298,14 @@ func traceReceiveOp(
ctx = trace.NewContext(receiverCtx, span)
}

if transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, transport))
if rec.transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, rec.transport))
}
return ctx
}

// endReceiveOp records the observability signals at the end of an operation.
func endReceiveOp(
func (rec *Receiver) endReceiveOp(
receiverCtx context.Context,
format string,
numReceivedItems int,
Expand Down
22 changes: 12 additions & 10 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func TestReceiveTraceDataOp(t *testing.T) {
}
rcvdSpans := []int{13, 42}
for i, param := range params {
ctx := obsreport.StartTraceDataReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
rcvdSpans[i],
Expand Down Expand Up @@ -169,10 +170,11 @@ func TestReceiveLogsOp(t *testing.T) {
}
rcvdLogRecords := []int{13, 42}
for i, param := range params {
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndLogsReceiveOp(
rec.EndLogsReceiveOp(
ctx,
format,
rcvdLogRecords[i],
Expand Down Expand Up @@ -229,10 +231,11 @@ func TestReceiveMetricsOp(t *testing.T) {
}
rcvdMetricPts := []int{23, 29}
for i, param := range params {
ctx := obsreport.StartMetricsReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndMetricsReceiveOp(
rec.EndMetricsReceiveOp(
ctx,
format,
rcvdMetricPts[i],
Expand Down Expand Up @@ -499,14 +502,13 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
for _, op := range ops {
// Use a new context on each operation to simulate distinct operations
// under the same long lived context.
ctx := obsreport.StartTraceDataReceiveOp(
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(
longLivedCtx,
receiver,
transport,
obsreport.WithLongLivedCtx())
assert.NotNil(t, ctx)

obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
op.numSpans,
Expand Down
15 changes: 9 additions & 6 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func TestCheckReceiverTracesViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartTraceDataReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
7,
Expand All @@ -60,9 +61,10 @@ func TestCheckReceiverMetricsViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartMetricsReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndMetricsReceiveOp(ctx, format, 7, nil)
rec.EndMetricsReceiveOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverMetrics(t, receiver, transport, 7, 0)
}
Expand All @@ -73,9 +75,10 @@ func TestCheckReceiverLogsViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndLogsReceiveOp(ctx, format, 7, nil)
rec.EndLogsReceiveOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverLogs(t, receiver, transport, 7, 0)
}
Expand Down
17 changes: 10 additions & 7 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,11 @@ func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err
// Jaeger spans received by the Jaeger agent processor.
func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
ctx = obsreport.ReceiverContext(ctx, h.id, h.transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, h.id, h.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: h.id, Transport: h.transport})
ctx = rec.StartTraceDataReceiveOp(ctx)

numSpans, err := consumeTraces(ctx, batch, h.nextConsumer)
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
rec.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
return err
}

Expand All @@ -267,12 +268,13 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}

ctx = obsreport.ReceiverContext(ctx, jr.id, grpcTransport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, jr.id, grpcTransport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: grpcTransport})
ctx = rec.StartTraceDataReceiveOp(ctx)

td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch())

err := jr.nextConsumer.ConsumeTraces(ctx, td)
obsreport.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
rec.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -415,12 +417,13 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
}

ctx = obsreport.ReceiverContext(ctx, jr.id, collectorHTTPTransport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, jr.id, collectorHTTPTransport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: jr.id, Transport: collectorHTTPTransport})
ctx = rec.StartTraceDataReceiveOp(ctx)

batch, hErr := jr.decodeThriftHTTPBody(r)
if hErr != nil {
http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode)
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, 0, hErr)
rec.EndTraceDataReceiveOp(ctx, thriftFormat, 0, hErr)
return
}

Expand All @@ -430,7 +433,7 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
} else {
w.WriteHeader(http.StatusAccepted)
}
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
rec.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
}

func (jr *jReceiver) startCollector(host component.Host) error {
Expand Down
10 changes: 6 additions & 4 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, c.id, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: c.id, Transport: transport})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, you can cache to object. Please apply the caching to all the non-test code where you can.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I applied to caching to all non-test code in my latest commit

ctx = rec.StartTraceDataReceiveOp(ctx)
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
Expand All @@ -269,7 +270,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe

spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
rec.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
return err
}
Expand Down Expand Up @@ -306,7 +307,8 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, c.id, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: c.id, Transport: transport})
ctx = rec.StartTraceDataReceiveOp(ctx)
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Insert(tagInstanceName, c.id.String())},
Expand All @@ -322,7 +324,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess

err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
rec.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions receiver/opencensusreceiver/ocmetrics/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,9 @@ func (ocr *Receiver) processReceivedMsg(
}

func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics []*ocmetrics.Metric) error {
ctx := obsreport.StartMetricsReceiveOp(
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: ocr.id, Transport: receiverTransport})
ctx := rec.StartMetricsReceiveOp(
longLivedRPCCtx,
ocr.id,
receiverTransport,
obsreport.WithLongLivedCtx())

numPoints := 0
Expand All @@ -141,7 +140,7 @@ func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *c
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(node, resource, metrics))
}

obsreport.EndMetricsReceiveOp(
rec.EndMetricsReceiveOp(
ctx,
receiverDataFormat,
numPoints,
Expand Down
7 changes: 3 additions & 4 deletions receiver/opencensusreceiver/octrace/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,13 @@ func (ocr *Receiver) processReceivedMsg(
}

func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, td pdata.Traces) error {
ctx := obsreport.StartTraceDataReceiveOp(
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: ocr.id, Transport: receiverTransport})
ctx := rec.StartTraceDataReceiveOp(
longLivedRPCCtx,
ocr.id,
receiverTransport,
obsreport.WithLongLivedCtx())

err := ocr.nextConsumer.ConsumeTraces(ctx, td)
obsreport.EndTraceDataReceiveOp(ctx, receiverDataFormat, td.SpanCount(), err)
rec.EndTraceDataReceiveOp(ctx, receiverDataFormat, td.SpanCount(), err)

return err
}
Loading