Skip to content

Commit fde6963

Browse files
authored
fix: pebble shutdown needs to handle async execution (#3114)
* fix issue with pebble not performing full shutdown and silently erroring * remove test that is irrelevant due to otel architecture * add to readme * add asyncDone to compaction job as well * account for pr feedback
1 parent 1339834 commit fde6963

File tree

9 files changed

+127
-74
lines changed

9 files changed

+127
-74
lines changed

collector/collector_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package collector
1717
import (
1818
"context"
1919
"fmt"
20+
"path/filepath"
2021
"testing"
2122
"time"
2223

@@ -259,3 +260,34 @@ func (slowShutdownReceiver) Shutdown(ctx context.Context) error {
259260
<-ctx.Done()
260261
return nil
261262
}
263+
264+
func TestCollectorLifecycleWithConfig(t *testing.T) {
265+
cases := []struct {
266+
desc string
267+
configPath string
268+
}{
269+
{
270+
desc: "config with pebble extension",
271+
configPath: filepath.Join("test", "pebble.yaml"),
272+
},
273+
}
274+
275+
for _, tt := range cases {
276+
t.Run(tt.desc, func(t *testing.T) {
277+
collector, err := New([]string{tt.configPath}, "0.0.0", nil)
278+
require.NoError(t, err)
279+
280+
err = collector.Run(context.Background())
281+
require.NoError(t, err)
282+
283+
status := <-collector.Status()
284+
require.True(t, status.Running)
285+
require.NoError(t, status.Err)
286+
287+
collector.Stop(context.Background())
288+
status = <-collector.Status()
289+
require.False(t, status.Running)
290+
require.False(t, status.Panicked)
291+
})
292+
}
293+
}

collector/test/pebble.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
receivers:
2+
filelog:
3+
include: ["./var/log/syslog.log"]
4+
storage: pebble
5+
6+
exporters:
7+
nop:
8+
9+
extensions:
10+
pebble:
11+
directory:
12+
path: /tmp/pebble
13+
sync: true
14+
15+
service:
16+
extensions: [pebble]
17+
pipelines:
18+
logs:
19+
receivers: [filelog]
20+
exporters: [nop]

extension/pebbleextension/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The Pebble Extension provides persistent storage for OpenTelemetry Collector com
1212
| sync | bool | true | No | Whether to sync writes to disk after every write. `true` offers safer durability guarantees. Setting `false` may improve performance but increases the chance of data loss in the event of a crash. |
1313
| compaction.interval | duration | 30m | No | How often background compaction runs (e.g., "30m" for 30 minutes). Compaction reclaims space from deleted entries. Set to 0s to disable background compaction. |
1414
| compaction.compaction_concurrency | uint64 | 3 | No | Number of concurrent background compaction jobs permitted. Increase for higher compaction throughput or decrease for reduced interference with other workloads. |
15+
| close_timeout | duration | 10s | No | Maximum time to wait for in-flight async operations to complete during shutdown. After this timeout, Close returns an error. |
1516

1617
## Example Configuration
1718

@@ -45,6 +46,7 @@ For advanced use cases, the following configuration options are exposed:
4546
- **sync**: Controls write durability. The default (`true`) provides safer durability guarantees by syncing writes to disk immediately.
4647
- **compaction.interval**: Controls how often background compaction checks run (default: 30 minutes). Set to 0 to disable background compaction.
4748
- **compaction.concurrency**: Controls how many folders will be attempted to be compacted at the given interval (default: `3`). This parameter helps avoid causing way too many resources being used for compaction.
49+
- **close_timeout**: Maximum time to wait for in-flight async operations to complete during shutdown (default: 10s). If async operations do not complete within this window, Close returns an error.
4850

4951
Refer to the [Pebble documentation](https://github.com/cockroachdb/pebble) for detailed information about the database.
5052

@@ -61,6 +63,7 @@ extensions:
6163
compaction:
6264
interval: 30m
6365
concurrency: 3
66+
close_timeout: 10s
6467
exporters:
6568
otlp:
6669
endpoint: otelcol:4317

extension/pebbleextension/config.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import (
2222

2323
// Config is the configuration for the pebble extension
2424
type Config struct {
25-
Directory *DirectoryConfig `mapstructure:"directory,omitempty"`
26-
Cache *CacheConfig `mapstructure:"cache,omitempty"`
27-
Sync bool `mapstructure:"sync"`
28-
Compaction *CompactionConfig `mapstructure:"compaction,omitempty"`
29-
_ struct{} // prevent unkeyed literal initialization
25+
Directory *DirectoryConfig `mapstructure:"directory,omitempty"`
26+
Cache *CacheConfig `mapstructure:"cache,omitempty"`
27+
Sync bool `mapstructure:"sync"`
28+
Compaction *CompactionConfig `mapstructure:"compaction,omitempty"`
29+
CloseTimeout time.Duration `mapstructure:"close_timeout,omitempty"`
30+
_ struct{} // prevent unkeyed literal initialization
3031
}
3132

3233
// DirectoryConfig is the configuration for the directory

extension/pebbleextension/extension.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ func (p *pebbleExtension) GetClient(_ context.Context, kind component.Kind, ent
8686
func (p *pebbleExtension) createClientForComponent(directory string, fullName string) (client.Client, error) {
8787
path := filepath.Join(directory, fullName)
8888
options := &client.Options{
89-
Sync: p.cfg.Sync,
89+
Sync: p.cfg.Sync,
90+
CloseTimeout: p.cfg.CloseTimeout,
9091
}
9192
if p.cfg.Cache != nil {
9293
options.CacheSize = p.cfg.Cache.Size
@@ -119,7 +120,6 @@ func kindString(k component.Kind) string {
119120
// Start initializes the pebble extension and starts a background task for compaction if configured
120121
func (p *pebbleExtension) Start(_ context.Context, _ component.Host) error {
121122
p.compactionDoneChan = make(chan struct{})
122-
123123
if p.cfg.Compaction != nil && p.cfg.Compaction.Interval > 0 {
124124
ctx, cancel := context.WithCancel(context.Background())
125125
p.compactionCancel = cancel
@@ -190,7 +190,6 @@ func (p *pebbleExtension) Shutdown(ctx context.Context) error {
190190
if p.compactionDoneChan != nil {
191191
close(p.compactionDoneChan)
192192
}
193-
194193
var errs error
195194
p.clientsMutex.Lock()
196195
for _, client := range p.clients {

extension/pebbleextension/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func createDefaultConfig() component.Config {
4646
Interval: 30 * time.Minute,
4747
Concurrency: 3,
4848
},
49+
CloseTimeout: 10 * time.Second,
4950
}
5051
}
5152

extension/pebbleextension/internal/client/client.go

Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
"errors"
2121
"fmt"
2222
"io"
23+
"sync"
24+
"sync/atomic"
25+
"time"
2326

2427
"github.com/cockroachdb/pebble"
2528
"go.opentelemetry.io/collector/component"
@@ -51,25 +54,36 @@ type client struct {
5154

5255
logger *zap.Logger
5356
path string
54-
doneChan chan struct{}
5557
writeOptions *pebble.WriteOptions
5658
readOnly bool
59+
closeTimeout time.Duration
60+
61+
// closed is used to track if the client has been closed
62+
closed atomic.Bool
63+
// asyncDone is used to wait for any async operations to complete on close
64+
asyncDone *sync.WaitGroup
5765
}
5866

5967
// Options are the options for opening the pebble database
6068
type Options struct {
61-
Sync bool
62-
CacheSize int64
69+
Sync bool
70+
CacheSize int64
71+
CloseTimeout time.Duration
6372
}
6473

65-
// ErrClientClosing is supposed to be returned when the client is closing and not allowing new operations to be performed
66-
var ErrClientClosing = errors.New("client is closing")
74+
const defaultCloseTimeout = 10 * time.Second
6775

6876
// NewClient creates a new client for the pebble database
6977
func NewClient(path string, logger *zap.Logger, options *Options) (Client, error) {
7078
c := &client{
71-
path: path,
72-
logger: logger,
79+
path: path,
80+
logger: logger,
81+
asyncDone: &sync.WaitGroup{},
82+
closeTimeout: defaultCloseTimeout,
83+
}
84+
85+
if options.CloseTimeout > 0 {
86+
c.closeTimeout = options.CloseTimeout
7387
}
7488

7589
writeOptions := &pebble.WriteOptions{}
@@ -94,9 +108,8 @@ func NewClient(path string, logger *zap.Logger, options *Options) (Client, error
94108

95109
// Get gets a key from the pebble database
96110
func (c *client) Get(_ context.Context, key string) ([]byte, error) {
97-
if c.isDone() {
98-
return nil, ErrClientClosing
99-
}
111+
c.asyncDone.Add(1)
112+
defer c.asyncDone.Done()
100113

101114
val, closer, err := c.db.Get([]byte(key))
102115
if err != nil {
@@ -114,21 +127,10 @@ func (c *client) Get(_ context.Context, key string) ([]byte, error) {
114127
return val, nil
115128
}
116129

117-
// isDone checks if the client has its done channel closed
118-
func (c *client) isDone() bool {
119-
select {
120-
case <-c.doneChan:
121-
return true
122-
default:
123-
return false
124-
}
125-
}
126-
127130
// Set sets a key in the pebble database
128131
func (c *client) Set(_ context.Context, key string, value []byte) error {
129-
if c.isDone() {
130-
return ErrClientClosing
131-
}
132+
c.asyncDone.Add(1)
133+
defer c.asyncDone.Done()
132134
err := c.db.Set([]byte(key), value, c.writeOptions)
133135
if err != nil {
134136
return fmt.Errorf("error setting key %s: %w", key, err)
@@ -138,26 +140,23 @@ func (c *client) Set(_ context.Context, key string, value []byte) error {
138140

139141
// Delete deletes a key from the pebble database
140142
func (c *client) Delete(_ context.Context, key string) error {
141-
if c.isDone() {
142-
return ErrClientClosing
143-
}
144-
143+
c.asyncDone.Add(1)
144+
defer c.asyncDone.Done()
145145
err := c.db.Delete([]byte(key), c.writeOptions)
146146
if err != nil {
147147
return fmt.Errorf("error deleting key %s: %w", key, err)
148148
}
149149
return nil
150150
}
151151

152+
// Batch performs a batch of operations on the pebble database
152153
func (c *client) Batch(ctx context.Context, ops ...*storage.Operation) error {
154+
c.asyncDone.Add(1)
155+
defer c.asyncDone.Done()
156+
153157
var wb *pebble.Batch
154-
var err error
155-
for _, op := range ops {
156-
if c.isDone() {
157-
err = ErrClientClosing
158-
break
159-
}
160158

159+
for _, op := range ops {
161160
var writes bool
162161
switch op.Type {
163162
case storage.Set, storage.Delete:
@@ -196,9 +195,6 @@ func (c *client) Batch(ctx context.Context, ops ...*storage.Operation) error {
196195
return wb.Commit(c.writeOptions)
197196
}
198197

199-
if err != nil {
200-
return err
201-
}
202198
return nil
203199
}
204200

@@ -215,18 +211,36 @@ func typeString(t storage.OpType) string {
215211
}
216212

217213
func (c *client) Start(_ context.Context, _ component.Host) error {
218-
c.doneChan = make(chan struct{})
219214
return nil
220215
}
221216

217+
// Close closes the client and waits for any async operations to complete.
218+
// Note that since extensions shutdown are done after the other components are shutdown, we can safely assume that no new operations will be performed before this call.
222219
func (c *client) Close(_ context.Context) error {
223-
if c.doneChan != nil {
224-
close(c.doneChan)
220+
if c.closed.Load() {
221+
c.logger.Info("pebble instance is already closed, skipping")
222+
return nil
225223
}
226224

227-
err := c.db.Flush()
228-
if err != nil {
229-
return fmt.Errorf("flush db: %w", err)
225+
c.closed.Store(true)
226+
227+
shutdownChan := make(chan struct{}, 1)
228+
waitTimeout, cancel := context.WithTimeout(context.Background(), c.closeTimeout)
229+
defer cancel()
230+
231+
go func() {
232+
c.asyncDone.Wait()
233+
err := c.db.Flush()
234+
if err != nil {
235+
c.logger.Error("failed to flush database", zap.Error(err))
236+
}
237+
close(shutdownChan)
238+
}()
239+
240+
select {
241+
case <-waitTimeout.Done():
242+
return fmt.Errorf("failed to wait for async operations to complete: %w", waitTimeout.Err())
243+
case <-shutdownChan:
230244
}
231245

232246
return c.db.Close()
@@ -236,8 +250,11 @@ func (c *client) Close(_ context.Context) error {
236250
// Note: Compaction is I/O intensive and may impact performance during operation so we should only sparsely run it if necessary.
237251
// Note: in v2 of Pebble we will use the context
238252
func (c *client) Compact(parallel bool) error {
253+
c.asyncDone.Add(1)
254+
defer c.asyncDone.Done()
255+
239256
c.logger.Debug("compacting database to reclaim space", zap.String("path", c.path))
240257
// just compacting the entire database for now, there may be a better way of doing this but this is a starting point.
241-
// tried looking into how cockroachdb does it but they have different lifeclycle
258+
// tried looking into how cockroachdb does it but they have different lifecycles
242259
return c.db.Compact([]byte{}, []byte{0xff, 0xff, 0xff, 0xff}, parallel)
243260
}

extension/pebbleextension/internal/client/client_test.go

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func TestBatch(t *testing.T) {
8484
require.NoError(t, err)
8585
require.Equal(t, []byte("test0"), v)
8686

87-
// batch deleting the items
88-
err = client.Batch(t.Context(), storage.DeleteOperation("test0"), storage.DeleteOperation("test1"))
87+
// batch deleting the items and getting a non-existent key
88+
err = client.Batch(t.Context(), storage.DeleteOperation("test0"), storage.DeleteOperation("test1"), storage.GetOperation("test2"))
8989
require.NoError(t, err)
9090

9191
v, err = client.Get(t.Context(), "test0")
@@ -109,26 +109,6 @@ func TestClose(t *testing.T) {
109109
require.NoError(t, err)
110110
}
111111

112-
func TestDoneProcessing(t *testing.T) {
113-
c, err := NewClient(t.TempDir(), zap.NewNop(), &Options{
114-
Sync: true,
115-
})
116-
require.NoError(t, err)
117-
118-
internalClient, ok := c.(*client)
119-
require.True(t, ok)
120-
require.NoError(t, internalClient.Start(t.Context(), nil))
121-
122-
err = internalClient.Set(t.Context(), "test", []byte("test"))
123-
require.NoError(t, err)
124-
125-
require.NoError(t, internalClient.Close(t.Context()))
126-
127-
val, err := internalClient.Get(t.Context(), "test")
128-
require.ErrorContains(t, err, "client is closing")
129-
require.Nil(t, val)
130-
}
131-
132112
func TestCompaction(t *testing.T) {
133113
testCases := []struct {
134114
name string

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ require (
236236
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver v0.144.0
237237
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver v0.144.0
238238
go.opentelemetry.io/collector/extension/extensiontest v0.144.0
239+
go.opentelemetry.io/collector/extension/xextension v0.144.0
239240
go.opentelemetry.io/collector/processor/processorhelper v0.144.0
240241
go.opentelemetry.io/collector/processor/processortest v0.144.0
241242
go.opentelemetry.io/collector/service v0.144.0
@@ -622,7 +623,6 @@ require (
622623
go.opentelemetry.io/collector/extension/extensionauth v1.50.0 // indirect
623624
go.opentelemetry.io/collector/extension/extensioncapabilities v0.144.0 // indirect
624625
go.opentelemetry.io/collector/extension/extensionmiddleware v0.144.0 // indirect
625-
go.opentelemetry.io/collector/extension/xextension v0.144.0 // indirect
626626
go.opentelemetry.io/collector/filter v0.144.0 // indirect
627627
go.opentelemetry.io/collector/internal/componentalias v0.144.0 // indirect
628628
go.opentelemetry.io/collector/internal/fanoutconsumer v0.144.0 // indirect

0 commit comments

Comments
 (0)