Skip to content

Commit c980f54

Browse files
Fix Shutdown behavior for batchprocessor
I added a Shutdown() test that does basic verification of the behavior of the Shutdown() function. More verifications can be added later. The test revealed a bug in batchprocessor Shutdown() function which would not wait until all pending data was drained.
1 parent 0ed7a4c commit c980f54

File tree

3 files changed

+91
-4
lines changed

3 files changed

+91
-4
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package componenttest
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
"go.uber.org/zap"
24+
25+
"go.opentelemetry.io/collector/component"
26+
"go.opentelemetry.io/collector/config/configerror"
27+
"go.opentelemetry.io/collector/config/configmodels"
28+
"go.opentelemetry.io/collector/consumer/consumertest"
29+
"go.opentelemetry.io/collector/consumer/pdata"
30+
)
31+
32+
func createSingleSpanTrace() pdata.Traces {
33+
d := pdata.NewTraces()
34+
d.ResourceSpans().Resize(1)
35+
d.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
36+
d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
37+
span := d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
38+
span.SetName("test span")
39+
return d
40+
}
41+
42+
func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
43+
// Create a processor and output its produce to a sink.
44+
nextSink := new(consumertest.TracesSink)
45+
processor, err := factory.CreateTracesProcessor(
46+
context.Background(),
47+
component.ProcessorCreateParams{Logger: zap.NewNop()},
48+
cfg,
49+
nextSink,
50+
)
51+
if err != nil {
52+
if err == configerror.ErrDataTypeIsNotSupported {
53+
return
54+
}
55+
require.NoError(t, err)
56+
}
57+
err = processor.Start(context.Background(), NewNopHost())
58+
assert.NoError(t, err)
59+
60+
// Send some traces to the processor.
61+
const generatedCount = 10
62+
for i := 0; i < generatedCount; i++ {
63+
processor.ConsumeTraces(context.Background(), createSingleSpanTrace())
64+
}
65+
66+
// Now shutdown the processor.
67+
err = processor.Shutdown(context.Background())
68+
assert.NoError(t, err)
69+
70+
// The Shutdown() is done. It means the processor must have sent everything we
71+
// gave it to the next sink.
72+
assert.EqualValues(t, generatedCount, nextSink.SpansCount())
73+
}
74+
75+
func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
76+
verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg)
77+
// TODO: add metrics and logs verification.
78+
// TODO: add other shutdown verifications.
79+
}

processor/batchprocessor/batch_processor.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type batchProcessor struct {
4848
sendBatchMaxSize uint32
4949

5050
timer *time.Timer
51-
done chan struct{}
51+
doneCh chan struct{}
5252
newItem chan interface{}
5353
batch batch
5454

@@ -87,7 +87,7 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
8787
sendBatchSize: cfg.SendBatchSize,
8888
sendBatchMaxSize: cfg.SendBatchMaxSize,
8989
timeout: cfg.Timeout,
90-
done: make(chan struct{}, 1),
90+
doneCh: make(chan struct{}, 1),
9191
newItem: make(chan interface{}, runtime.NumCPU()),
9292
batch: batch,
9393
ctx: ctx,
@@ -108,7 +108,9 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error {
108108
// Shutdown is invoked during service shutdown.
109109
func (bp *batchProcessor) Shutdown(context.Context) error {
110110
bp.cancel()
111-
<-bp.done
111+
112+
// Wait until current batch is drained.
113+
<-bp.doneCh
112114
return nil
113115
}
114116

@@ -132,7 +134,8 @@ func (bp *batchProcessor) startProcessingCycle() {
132134
// make it cancellable using the context that Shutdown gets as a parameter
133135
bp.sendItems(statTimeoutTriggerSend)
134136
}
135-
close(bp.done)
137+
// Indicate that we finished draining.
138+
close(bp.doneCh)
136139
return
137140
case item := <-bp.newItem:
138141
if item == nil {

processor/batchprocessor/batch_processor_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
689689
}
690690
return logsReceivedByName
691691
}
692+
693+
func TestShutdown(t *testing.T) {
694+
factory := NewFactory()
695+
componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig())
696+
}

0 commit comments

Comments
 (0)