-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.go
More file actions
95 lines (84 loc) · 2.44 KB
/
consumer.go
File metadata and controls
95 lines (84 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/segmentio/kafka-go"
collectorv1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
)
// Consumer reads span batches from Kafka and forwards them to Jaeger via OTLP gRPC.
type Consumer struct {
reader *kafka.Reader
jaeger collectorv1.TraceServiceClient
log *slog.Logger
}
func NewConsumer(brokers []string, topic string, jaegerEndpoint string, log *slog.Logger) (*Consumer, error) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: "otel-pipeline-jaeger",
MinBytes: 1,
MaxBytes: 10e6,
CommitInterval: time.Second,
StartOffset: kafka.FirstOffset,
})
conn, err := grpc.NewClient(jaegerEndpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
_ = r.Close()
return nil, fmt.Errorf("jaeger grpc dial: %w", err)
}
return &Consumer{
reader: r,
jaeger: collectorv1.NewTraceServiceClient(conn),
log: log,
}, nil
}
// Run reads messages from Kafka until ctx is cancelled.
// Transient fetch errors (broker not ready, group coordinator unavailable) are
// logged and retried; the loop only exits when the context is done.
func (c *Consumer) Run(ctx context.Context) error {
c.log.InfoContext(ctx, "consumer started")
for {
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
c.log.WarnContext(ctx, "kafka fetch failed, retrying", "error", err)
select {
case <-ctx.Done():
return nil
case <-time.After(2 * time.Second):
}
continue
}
if err := c.forward(ctx, msg.Value); err != nil {
c.log.ErrorContext(ctx, "forward failed", "error", err, "offset", msg.Offset)
continue
}
if err := c.reader.CommitMessages(ctx, msg); err != nil {
c.log.WarnContext(ctx, "commit failed", "error", err)
}
}
}
func (c *Consumer) forward(ctx context.Context, data []byte) error {
var req collectorv1.ExportTraceServiceRequest
if err := proto.Unmarshal(data, &req); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
_, err := c.jaeger.Export(ctx, &req)
if err != nil {
return fmt.Errorf("jaeger export: %w", err)
}
c.log.DebugContext(ctx, "forwarded to jaeger", "resource_spans", len(req.ResourceSpans))
return nil
}
func (c *Consumer) Close() error {
return c.reader.Close()
}