Skip to content

Commit f123dd2

Browse files
Fix Shutdown behavior for batchprocessor
I added a Shutdown() test that does basic verification of the behavior of the Shutdown() function. More verifications can be added later. The test revealed a bug in batchprocessor Shutdown() function which would not wait until all pending data was drained.
1 parent 0ed7a4c commit f123dd2

File tree

3 files changed

+77
-0
lines changed

3 files changed

+77
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package componenttest
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
"go.uber.org/zap"
24+
25+
"go.opentelemetry.io/collector/component"
26+
"go.opentelemetry.io/collector/config/configerror"
27+
"go.opentelemetry.io/collector/config/configmodels"
28+
"go.opentelemetry.io/collector/consumer/consumertest"
29+
"go.opentelemetry.io/collector/internal/testdata"
30+
)
31+
32+
func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
33+
// Create a processor and output its produce to a sink.
34+
nextSink := new(consumertest.TracesSink)
35+
processor, err := factory.CreateTracesProcessor(
36+
context.Background(),
37+
component.ProcessorCreateParams{Logger: zap.NewNop()},
38+
cfg,
39+
nextSink,
40+
)
41+
if err != nil {
42+
if err == configerror.ErrDataTypeIsNotSupported {
43+
return
44+
}
45+
require.NoError(t, err)
46+
}
47+
err = processor.Start(context.Background(), NewNopHost())
48+
assert.NoError(t, err)
49+
50+
// Send some traces to the processor.
51+
const generatedCount = 10
52+
for i := 0; i < generatedCount; i++ {
53+
processor.ConsumeTraces(context.Background(), testdata.GenerateTraceDataOneSpan())
54+
}
55+
56+
// Now shutdown the processor.
57+
err = processor.Shutdown(context.Background())
58+
assert.NoError(t, err)
59+
60+
// The Shutdown() is done. It means the processor must have sent everything we
61+
// gave it to the next sink.
62+
assert.EqualValues(t, generatedCount, nextSink.SpansCount())
63+
}
64+
65+
func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
66+
verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg)
67+
// TODO: add metrics and logs verification.
68+
// TODO: add other shutdown verifications.
69+
}

processor/batchprocessor/batch_processor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error {
108108
// Shutdown is invoked during service shutdown.
109109
func (bp *batchProcessor) Shutdown(context.Context) error {
110110
bp.cancel()
111+
112+
// Wait until current batch is drained.
111113
<-bp.done
112114
return nil
113115
}
@@ -132,6 +134,7 @@ func (bp *batchProcessor) startProcessingCycle() {
132134
// make it cancellable using the context that Shutdown gets as a parameter
133135
bp.sendItems(statTimeoutTriggerSend)
134136
}
137+
// Indicate that we finished draining.
135138
close(bp.done)
136139
return
137140
case item := <-bp.newItem:

processor/batchprocessor/batch_processor_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
689689
}
690690
return logsReceivedByName
691691
}
692+
693+
func TestShutdown(t *testing.T) {
694+
factory := NewFactory()
695+
componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig())
696+
}

0 commit comments

Comments
 (0)