Skip to content

Commit fbe6414

Browse files
Fix Shutdown behavior for batchprocessor
I added a Shutdown() test that does basic verification of the behavior of the Shutdown() function. More verifications can be added later. The test revealed a bug in batchprocessor Shutdown() function which would hang in Consume* functions after shutdown was called because it was not possible to send to the channel that the batchprocessor uses. I will add tests for more components in subsequent PRs. This work is necessary to ensure Shutdown() works correctly (we will see in future PRs that we have other bugs that need to be fixed). The test is written in a generic way that can be used for other components.
1 parent 0ed7a4c commit fbe6414

File tree

3 files changed

+161
-7
lines changed

3 files changed

+161
-7
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package componenttest
16+
17+
import (
18+
"context"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
"go.uber.org/zap"
25+
26+
"go.opentelemetry.io/collector/component"
27+
"go.opentelemetry.io/collector/config/configerror"
28+
"go.opentelemetry.io/collector/config/configmodels"
29+
"go.opentelemetry.io/collector/consumer"
30+
"go.opentelemetry.io/collector/consumer/consumertest"
31+
"go.opentelemetry.io/collector/consumer/pdata"
32+
)
33+
34+
func createSingleSpanTrace() pdata.Traces {
35+
d := pdata.NewTraces()
36+
d.ResourceSpans().Resize(1)
37+
d.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
38+
d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
39+
span := d.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
40+
span.SetName("test span")
41+
return d
42+
}
43+
44+
func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
45+
// Create a processor and output its produce to a sink.
46+
nextSink := &consumertest.TracesSink{}
47+
processor, err := factory.CreateTracesProcessor(
48+
context.Background(),
49+
component.ProcessorCreateParams{Logger: zap.NewNop()},
50+
cfg,
51+
nextSink,
52+
)
53+
if err != nil {
54+
if err == configerror.ErrDataTypeIsNotSupported {
55+
return
56+
}
57+
require.NoError(t, err)
58+
}
59+
err = processor.Start(context.Background(), NewNopHost())
60+
assert.NoError(t, err)
61+
62+
doneSignal := make(chan bool)
63+
64+
// Send traces to the processor until we signal via doneSignal, and then continue
65+
// sending some more traces after that.
66+
go generateTraces(processor, doneSignal)
67+
68+
// Wait until the processor outputs anything to the sink.
69+
assert.Eventually(t, func() bool {
70+
return nextSink.SpansCount() > 0
71+
}, time.Second, 1*time.Millisecond)
72+
73+
// Now shutdown the processor.
74+
err = processor.Shutdown(context.Background())
75+
assert.NoError(t, err)
76+
77+
// Remember how many spans the sink received. This number should not change after this
78+
// point because after Shutdown() returns the component is not allowed to produce
79+
// any more data.
80+
sinkSpanCountAfterShutdown := nextSink.SpansCount()
81+
82+
// Now signal to generateTraces to exit the main generation loop, then send
83+
// a number of follow up traces and stop.
84+
doneSignal <- true
85+
86+
// Wait until all follow up traces are sent.
87+
<-doneSignal
88+
89+
// The follow up traces should not be received by sink, so the number of spans in
90+
// the sink should not change.
91+
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount())
92+
93+
// Note that sending the follow up traces also helps catch another bug: component's
94+
// ongoing Consume* function never returning once Shutdown() is called.
95+
}
96+
97+
func generateTraces(consumer consumer.TracesConsumer, doneSignal chan bool) {
98+
// Continuously generate spans until signaled to stop.
99+
loop:
100+
for {
101+
select {
102+
case <-doneSignal:
103+
break loop
104+
default:
105+
}
106+
consumer.ConsumeTraces(context.Background(), createSingleSpanTrace())
107+
}
108+
109+
// After getting the signal to stop generate another 1000 spans and then
110+
// finally stop.
111+
const afterDoneSpanCount = 1000
112+
for i := 0; i < afterDoneSpanCount; i++ {
113+
consumer.ConsumeTraces(context.Background(), createSingleSpanTrace())
114+
}
115+
116+
// Indicate that we are done.
117+
close(doneSignal)
118+
}
119+
120+
func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) {
121+
verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg)
122+
// TODO: add metrics and logs verification.
123+
// TODO: add other shutdown verifications.
124+
}

processor/batchprocessor/batch_processor.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ package batchprocessor
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"runtime"
21+
"sync/atomic"
2022
"time"
2123

2224
"go.opencensus.io/stats"
@@ -47,10 +49,11 @@ type batchProcessor struct {
4749
timeout time.Duration
4850
sendBatchMaxSize uint32
4951

50-
timer *time.Timer
51-
done chan struct{}
52-
newItem chan interface{}
53-
batch batch
52+
timer *time.Timer
53+
doneCh chan struct{}
54+
shutdownFlag int64 // 0=false, 1=true
55+
newItem chan interface{}
56+
batch batch
5457

5558
ctx context.Context
5659
cancel context.CancelFunc
@@ -87,7 +90,7 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
8790
sendBatchSize: cfg.SendBatchSize,
8891
sendBatchMaxSize: cfg.SendBatchMaxSize,
8992
timeout: cfg.Timeout,
90-
done: make(chan struct{}, 1),
93+
doneCh: make(chan struct{}, 1),
9194
newItem: make(chan interface{}, runtime.NumCPU()),
9295
batch: batch,
9396
ctx: ctx,
@@ -108,10 +111,19 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error {
108111
// Shutdown is invoked during service shutdown.
109112
func (bp *batchProcessor) Shutdown(context.Context) error {
110113
bp.cancel()
111-
<-bp.done
114+
115+
// Stop accepting new data.
116+
atomic.StoreInt64(&bp.shutdownFlag, 1)
117+
118+
// Wait until current batch is drained.
119+
<-bp.doneCh
112120
return nil
113121
}
114122

123+
func (bp *batchProcessor) isShutdown() bool {
124+
return atomic.LoadInt64(&bp.shutdownFlag) != 0
125+
}
126+
115127
func (bp *batchProcessor) startProcessingCycle() {
116128
bp.timer = time.NewTimer(bp.timeout)
117129
for {
@@ -132,7 +144,8 @@ func (bp *batchProcessor) startProcessingCycle() {
132144
// make it cancellable using the context that Shutdown gets as a parameter
133145
bp.sendItems(statTimeoutTriggerSend)
134146
}
135-
close(bp.done)
147+
// Indicate that we finished draining.
148+
close(bp.doneCh)
136149
return
137150
case item := <-bp.newItem:
138151
if item == nil {
@@ -201,19 +214,31 @@ func (bp *batchProcessor) sendItems(measure *stats.Int64Measure) {
201214

202215
// ConsumeTraces implements TracesProcessor
203216
func (bp *batchProcessor) ConsumeTraces(_ context.Context, td pdata.Traces) error {
217+
if bp.isShutdown() {
218+
return fmt.Errorf("ConsumeTraces called after Shutdown")
219+
}
220+
204221
bp.newItem <- td
205222
return nil
206223
}
207224

208225
// ConsumeTraces implements MetricsProcessor
209226
func (bp *batchProcessor) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
227+
if bp.isShutdown() {
228+
return fmt.Errorf("ConsumeMetrics called after Shutdown")
229+
}
230+
210231
// First thing is convert into a different internal format
211232
bp.newItem <- md
212233
return nil
213234
}
214235

215236
// ConsumeLogs implements LogsProcessor
216237
func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
238+
if bp.isShutdown() {
239+
return fmt.Errorf("ConsumeLogs called after Shutdown")
240+
}
241+
217242
bp.newItem <- ld
218243
return nil
219244
}

processor/batchprocessor/batch_processor_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
689689
}
690690
return logsReceivedByName
691691
}
692+
693+
func TestShutdown(t *testing.T) {
694+
factory := NewFactory()
695+
componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig())
696+
}

0 commit comments

Comments
 (0)