Skip to content

Commit 66c4908

Browse files
Refactored Jaeger exporter to move connection into start for upcoming Auth changes. (#3299)
* Refactored Jaeger exporter to move connection into start * added more thorough cleznup * fixed linting
1 parent 76d1c70 commit 66c4908

File tree

2 files changed

+73
-40
lines changed

2 files changed

+73
-40
lines changed

exporter/jaegerexporter/exporter.go

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"google.golang.org/grpc/metadata"
3030

3131
"go.opentelemetry.io/collector/component"
32+
"go.opentelemetry.io/collector/config/configgrpc"
3233
"go.opentelemetry.io/collector/consumer"
3334
"go.opentelemetry.io/collector/consumer/consumererror"
3435
"go.opentelemetry.io/collector/consumer/pdata"
@@ -40,24 +41,7 @@ import (
4041
// The exporter name is the name to be used in the observability of the exporter.
4142
// The collectorEndpoint should be of the form "hostname:14250" (a gRPC target).
4243
func newTracesExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter, error) {
43-
opts, err := cfg.GRPCClientSettings.ToDialOptions()
44-
if err != nil {
45-
return nil, err
46-
}
47-
48-
conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
49-
if err != nil {
50-
return nil, err
51-
}
52-
53-
collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn)
54-
s := newProtoGRPCSender(logger,
55-
cfg.ID().String(),
56-
collectorServiceClient,
57-
metadata.New(cfg.GRPCClientSettings.Headers),
58-
cfg.WaitForReady,
59-
conn,
60-
)
44+
s := newProtoGRPCSender(cfg, logger)
6145
return exporterhelper.NewTracesExporter(
6246
cfg, logger, s.pushTraceData,
6347
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
@@ -82,23 +66,21 @@ type protoGRPCSender struct {
8266
connStateReporterInterval time.Duration
8367
stateChangeCallbacks []func(connectivity.State)
8468

85-
stopCh chan struct{}
86-
stopped bool
87-
stopLock sync.Mutex
69+
stopCh chan struct{}
70+
stopped bool
71+
stopLock sync.Mutex
72+
clientSettings *configgrpc.GRPCClientSettings
8873
}
8974

90-
func newProtoGRPCSender(logger *zap.Logger, name string, cl jaegerproto.CollectorServiceClient, md metadata.MD, waitForReady bool, conn stateReporter) *protoGRPCSender {
75+
func newProtoGRPCSender(cfg *Config, logger *zap.Logger) *protoGRPCSender {
9176
s := &protoGRPCSender{
92-
name: name,
93-
logger: logger,
94-
client: cl,
95-
metadata: md,
96-
waitForReady: waitForReady,
97-
98-
conn: conn,
77+
name: cfg.ID().String(),
78+
logger: logger,
79+
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
80+
waitForReady: cfg.WaitForReady,
9981
connStateReporterInterval: time.Second,
100-
101-
stopCh: make(chan struct{}),
82+
stopCh: make(chan struct{}),
83+
clientSettings: &cfg.GRPCClientSettings,
10284
}
10385
s.AddStateChangeCallback(s.onStateChange)
10486
return s
@@ -144,7 +126,23 @@ func (s *protoGRPCSender) shutdown(context.Context) error {
144126
return nil
145127
}
146128

147-
func (s *protoGRPCSender) start(context.Context, component.Host) error {
129+
func (s *protoGRPCSender) start(_ context.Context, host component.Host) error {
130+
if s.clientSettings == nil {
131+
return fmt.Errorf("client settings not found")
132+
}
133+
opts, err := s.clientSettings.ToDialOptions()
134+
if err != nil {
135+
return err
136+
}
137+
138+
conn, err := grpc.Dial(s.clientSettings.Endpoint, opts...)
139+
if err != nil {
140+
return err
141+
}
142+
143+
s.client = jaegerproto.NewCollectorServiceClient(conn)
144+
s.conn = conn
145+
148146
go s.startConnectionStatusReporter()
149147
return nil
150148
}

exporter/jaegerexporter/exporter_test.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,19 @@ func TestNew(t *testing.T) {
149149
for _, tt := range tests {
150150
t.Run(tt.name, func(t *testing.T) {
151151
got, err := newTracesExporter(&tt.config, zap.NewNop())
152-
if (err != nil) != tt.wantErr {
153-
t.Errorf("newTracesExporter() error = %v, wantErr %v", err, tt.wantErr)
154-
return
155-
}
156-
if got == nil {
152+
assert.NoError(t, err)
153+
assert.NotNil(t, got)
154+
t.Cleanup(func() {
155+
require.NoError(t, got.Shutdown(context.Background()))
156+
})
157+
158+
err = got.Start(context.Background(), componenttest.NewNopHost())
159+
if tt.wantErr {
160+
assert.Error(t, err)
157161
return
158162
}
159163

164+
require.NoError(t, err)
160165
// This is expected to fail.
161166
err = got.ConsumeTraces(context.Background(), testdata.GenerateTracesNoLibraries())
162167
assert.Error(t, err)
@@ -220,7 +225,7 @@ func TestMutualTLS(t *testing.T) {
220225
}
221226
exporter, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
222227
require.NoError(t, err)
223-
err = exporter.Start(context.Background(), nil)
228+
err = exporter.Start(context.Background(), componenttest.NewNopHost())
224229
require.NoError(t, err)
225230
t.Cleanup(func() { require.NoError(t, exporter.Shutdown(context.Background())) })
226231

@@ -256,6 +261,15 @@ func TestConnectionStateChange(t *testing.T) {
256261
stopCh: make(chan struct{}),
257262
conn: sr,
258263
connStateReporterInterval: 10 * time.Millisecond,
264+
clientSettings: &configgrpc.GRPCClientSettings{
265+
Headers: nil,
266+
Endpoint: "foo.bar",
267+
Compression: "",
268+
TLSSetting: configtls.TLSClientSetting{
269+
Insecure: true,
270+
},
271+
Keepalive: nil,
272+
},
259273
}
260274

261275
wg.Add(1)
@@ -264,8 +278,20 @@ func TestConnectionStateChange(t *testing.T) {
264278
wg.Done()
265279
})
266280

267-
require.NoError(t, sender.start(context.Background(), componenttest.NewNopHost()))
268-
t.Cleanup(func() { require.NoError(t, sender.shutdown(context.Background())) })
281+
done := make(chan struct{})
282+
go func() {
283+
sender.startConnectionStatusReporter()
284+
done <- struct{}{}
285+
}()
286+
287+
t.Cleanup(func() {
288+
// set the stopped flag, and wait for statusReporter to finish and signal back
289+
sender.stopLock.Lock()
290+
sender.stopped = true
291+
sender.stopLock.Unlock()
292+
<-done
293+
})
294+
269295
wg.Wait() // wait for the initial state to be propagated
270296

271297
// test
@@ -287,6 +313,15 @@ func TestConnectionReporterEndsOnStopped(t *testing.T) {
287313
stopCh: make(chan struct{}),
288314
conn: sr,
289315
connStateReporterInterval: 10 * time.Millisecond,
316+
clientSettings: &configgrpc.GRPCClientSettings{
317+
Headers: nil,
318+
Endpoint: "foo.bar",
319+
Compression: "",
320+
TLSSetting: configtls.TLSClientSetting{
321+
Insecure: true,
322+
},
323+
Keepalive: nil,
324+
},
290325
}
291326

292327
wg := sync.WaitGroup{}

0 commit comments

Comments
 (0)