Skip to content

Commit 1ab9bd3

Browse files
committed
Weirdness with assert not failing tests
1 parent 16c40e6 commit 1ab9bd3

File tree

5 files changed

+229
-57
lines changed

5 files changed

+229
-57
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/spf13/viper v1.6.2
1313
github.com/stretchr/testify v1.5.1
1414
github.com/xeipuuv/gojsonschema v1.2.0
15+
go.uber.org/goleak v1.0.0
1516
go.uber.org/zap v1.14.0
1617
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
1718
golang.org/x/net v0.0.0-20200301022130-244492dfa37a // indirect

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
169169
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
170170
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
171171
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
172+
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
173+
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
172174
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
173175
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
174176
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
@@ -262,6 +264,7 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
262264
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
263265
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
264266
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
267+
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
265268
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
266269
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
267270
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=

plugin/builtin/fileinput/file_source.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"time"
1616

1717
pg "github.com/bluemedora/bplogagent/plugin"
18-
// using Docker's filenotify so we can fall back to polling
19-
// for envs where notify isn't available
2018
)
2119

2220
func init() {
@@ -49,45 +47,52 @@ func (c FileSourceConfig) Build(buildContext pg.BuildContext) (pg.Plugin, error)
4947
return nil, fmt.Errorf("build default outputter: %s", err)
5048
}
5149

50+
// Ensure includes can be parsed as globs
5251
for _, include := range c.Include {
5352
_, err := filepath.Match(include, "")
5453
if err != nil {
5554
return nil, fmt.Errorf("parse include glob: %s", err)
5655
}
5756
}
5857

58+
// Ensure excludes can be parsed as globs
5959
for _, exclude := range c.Exclude {
6060
_, err := filepath.Match(exclude, "")
6161
if err != nil {
6262
return nil, fmt.Errorf("parse exclude glob: %s", err)
6363
}
6464
}
6565

66+
// Determine the split function for log entries
6667
var splitFunc bufio.SplitFunc
6768
if c.Multiline == nil {
6869
splitFunc = bufio.ScanLines
69-
} else if c.Multiline.LineEndPattern != "" && c.Multiline.LineStartPattern != "" {
70-
return nil, fmt.Errorf("cannot configure both line_start_pattern and line_end_pattern")
71-
} else if c.Multiline.LineEndPattern != "" {
72-
re, err := regexp.Compile(c.Multiline.LineEndPattern)
73-
if err != nil {
74-
return nil, fmt.Errorf("compile line end regex: %s", err)
75-
}
76-
splitFunc = NewLineEndSplitFunc(re)
77-
} else if c.Multiline.LineStartPattern != "" {
78-
re, err := regexp.Compile(c.Multiline.LineStartPattern)
79-
if err != nil {
80-
return nil, fmt.Errorf("compile line start regex: %s", err)
70+
} else {
71+
definedLineEndPattern := c.Multiline.LineEndPattern != ""
72+
definedLineStartPattern := c.Multiline.LineStartPattern != ""
73+
74+
switch {
75+
case definedLineEndPattern == definedLineStartPattern:
76+
return nil, fmt.Errorf("if multiline is configured, exactly one of line_start_pattern or line_end_pattern must be set")
77+
case definedLineEndPattern:
78+
re, err := regexp.Compile(c.Multiline.LineEndPattern)
79+
if err != nil {
80+
return nil, fmt.Errorf("compile line end regex: %s", err)
81+
}
82+
splitFunc = NewLineEndSplitFunc(re)
83+
case definedLineStartPattern:
84+
re, err := regexp.Compile(c.Multiline.LineStartPattern)
85+
if err != nil {
86+
return nil, fmt.Errorf("compile line start regex: %s", err)
87+
}
88+
splitFunc = NewLineStartSplitFunc(re)
8189
}
82-
splitFunc = NewLineStartSplitFunc(re)
83-
} else if c.Multiline.LineEndPattern == "" && c.Multiline.LineStartPattern == "" {
84-
return nil, fmt.Errorf("if multiline is configured, either line_start_pattern or line_end_pattern must be configured")
8590
}
8691

92+
// Parse the poll interval
8793
if c.PollInterval < 0 {
8894
return nil, fmt.Errorf("poll_interval must be greater than zero if configured")
8995
}
90-
9196
pollInterval := func() time.Duration {
9297
if c.PollInterval == 0 {
9398
return 5 * time.Second
@@ -106,7 +111,7 @@ func (c FileSourceConfig) Build(buildContext pg.BuildContext) (pg.Plugin, error)
106111
PollInterval: pollInterval,
107112

108113
fileCreated: make(chan string),
109-
fileTouched: make(chan string),
114+
fileTouched: make(chan struct{}),
110115
fileRemoved: make(chan *FileWatcher),
111116
directoryRemoved: make(chan *DirectoryWatcher),
112117
}
@@ -135,7 +140,7 @@ type FileSource struct {
135140

136141
fileCreated chan string
137142
fileRemoved chan *FileWatcher
138-
fileTouched chan string
143+
fileTouched chan struct{}
139144
directoryRemoved chan *DirectoryWatcher
140145
}
141146

@@ -176,6 +181,7 @@ func (f *FileSource) Start() error {
176181
f.Debugw("Received directory removed notification", "path", watcher.path)
177182
f.removeDirectoryWatcher(watcher)
178183
case <-f.fileTouched:
184+
// swallow messages as a notification that it's safe to read?
179185
}
180186
}
181187
}()
@@ -227,7 +233,7 @@ func (f *FileSource) tryAddFile(ctx context.Context, path string, globCheck bool
227233
return
228234
}
229235

230-
watcher, err := NewFileWatcher(path, f, startFromBeginning, f.SplitFunc, f.PollInterval, f.SugaredLogger)
236+
watcher, err := NewFileWatcher(path, f.Output, startFromBeginning, f.SplitFunc, f.PollInterval, f.SugaredLogger)
231237
if err != nil {
232238
if pathError, ok := err.(*os.PathError); ok && pathError.Err.Error() == "no such file or directory" {
233239
f.Debugw("File deleted before it could be read", "path", path)
@@ -283,6 +289,7 @@ func (f *FileSource) checkPath(path string, checkCopy bool) (createWatcher bool,
283289

284290
for _, watcher := range f.fileWatchers {
285291
// TODO what if multiple match? anything?
292+
// TODO how do links (hard and soft) interact with this logic?
286293
if watcher.dev == dev && watcher.inode == inode {
287294
if watcher.path == path {
288295
return false, false, nil
@@ -303,6 +310,7 @@ func (f *FileSource) checkPath(path string, checkCopy bool) (createWatcher bool,
303310
}
304311

305312
func (f *FileSource) fingerprint(file *os.File) string {
313+
// TODO make sure resetting the seek location isn't messing with things
306314
_, err := file.Seek(0, io.SeekStart)
307315
if err != nil {
308316
panic(err)

plugin/builtin/fileinput/file_watcher.go

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,31 @@ import (
1313
"go.uber.org/zap"
1414
)
1515

16-
// FileWatcher is a wrapper around `fsnotify` that periodically polls
17-
// to mitigate issues with filesystems that don't support notify events
16+
// FileWatcher is a wrapper around `fsnotify` that periodically polls to provide
17+
// a fallback for filesystems and platforms that don't support event notification
1818
type FileWatcher struct {
1919
inode uint64
2020
dev uint64
2121
path string
2222
file *os.File
2323
offset int64
2424
pollInterval time.Duration
25-
fileSource *FileSource
2625
cancel context.CancelFunc
2726
splitFunc bufio.SplitFunc
27+
output func(*entry.Entry) error
2828

2929
*zap.SugaredLogger
3030
}
3131

32-
func NewFileWatcher(path string, fileSource *FileSource, startFromBeginning bool, splitFunc bufio.SplitFunc, pollInterval time.Duration, logger *zap.SugaredLogger) (*FileWatcher, error) {
32+
func NewFileWatcher(
33+
path string,
34+
output func(*entry.Entry) error,
35+
startFromBeginning bool,
36+
splitFunc bufio.SplitFunc,
37+
pollInterval time.Duration,
38+
logger *zap.SugaredLogger,
39+
) (*FileWatcher, error) {
40+
3341
file, err := os.Open(path)
3442
if err != nil {
3543
return nil, err
@@ -66,37 +74,44 @@ func NewFileWatcher(path string, fileSource *FileSource, startFromBeginning bool
6674
path: path,
6775
pollInterval: pollInterval,
6876
offset: offset,
69-
fileSource: fileSource,
7077
splitFunc: splitFunc,
71-
SugaredLogger: logger.With("path", path),
78+
output: output,
79+
SugaredLogger: logger.Named("file_watcher").With("path", path),
7280
}, nil
7381
}
7482

7583
func (w *FileWatcher) Watch(startCtx context.Context) error {
7684
ctx, cancel := context.WithCancel(startCtx)
7785
w.cancel = cancel
7886

87+
// Create the watcher
7988
watcher, err := fsnotify.NewWatcher()
8089
if err != nil {
8190
// TODO if falling back to polling, should we set the default lower?
8291
w.Infow("Failed to create notifying watcher. Falling back to polling only", "error", err)
8392
watcher = &fsnotify.Watcher{} // create an empty watcher whose channels are just nil
8493
} else {
94+
defer watcher.Close()
8595
err = watcher.Add(w.path)
8696
if err != nil {
8797
w.Infow("Failed to add path to watcher. Falling back to polling only", "error", err)
8898
watcher = &fsnotify.Watcher{} // create an empty watcher whose channels are just nil
8999
}
90100
}
91101

102+
// Keep a persistent open file
92103
file, err := os.Open(w.path)
93104
if err != nil {
94105
return err
95106
}
96107
defer file.Close()
97108
w.file = file
98109

99-
w.checkFile(ctx) // Check it once initially for responsive startup
110+
// Check it once initially for responsive startup
111+
err = w.checkReadFile(ctx)
112+
if err != nil {
113+
return err
114+
}
100115

101116
for {
102117
// TODO actually test all these cases
@@ -109,62 +124,72 @@ func (w *FileWatcher) Watch(startCtx context.Context) error {
109124
select {
110125
case <-ctx.Done():
111126
timer.Stop()
112-
err := watcher.Close()
113-
if err != nil {
114-
return err
115-
}
127+
return nil
116128
case event, ok := <-watcher.Events:
117129
timer.Stop()
118130
if !ok {
119131
return nil
120132
}
121133
if event.Op&fsnotify.Remove > 0 {
122-
watcher.Close()
123-
w.fileSource.fileRemoved <- w
124-
continue
134+
return watcher.Close()
125135
}
126136
if event.Op&(fsnotify.Write|fsnotify.Chmod) > 0 {
127-
w.checkFile(ctx)
137+
err := w.checkReadFile(ctx)
138+
if err != nil {
139+
return err
140+
}
128141
}
129-
// ignore chmod and rename (rename is covered by directory create)
142+
// ignore rename (rename is covered by directory create)
130143
case <-timer.C:
131-
w.checkFile(ctx)
144+
err := w.checkReadFile(ctx)
145+
if err != nil {
146+
return err
147+
}
132148
case err := <-watcher.Errors:
133149
timer.Stop()
134150
return err
135151
}
136152
}
137153
}
138154

139-
func (w *FileWatcher) checkFile(ctx context.Context) {
155+
func (w *FileWatcher) checkReadFile(ctx context.Context) error {
156+
// TODO ensure that none of the errors thrown in here are recoverable
157+
// since returning an error triggers a return from the watch function
140158
select {
141-
case w.fileSource.fileTouched <- w.path:
142159
case <-ctx.Done():
143-
return
160+
return nil
161+
default:
144162
}
145163

164+
// TODO check if the file still exists
165+
146166
fileInfo, err := w.file.Stat()
147167
if err != nil {
148-
w.Errorw("Failed to get file info", "error", err) // TODO is this a recoverable error?
149-
return
168+
return err
150169
}
151170

152171
if fileInfo.Size() < w.offset {
153172
w.Debug("Detected file truncation. Starting from beginning")
154173
w.offset, err = w.file.Seek(0, 0)
155174
if err != nil {
156-
w.Errorw("Failed to seek to file start", "error", err)
157-
return
175+
return fmt.Errorf("seek to start: %s", err)
176+
}
177+
err := w.readToEnd(ctx)
178+
if err != nil {
179+
return err
158180
}
159-
w.readToEnd(ctx)
160181
} else if fileInfo.Size() > w.offset {
161-
w.readToEnd(ctx)
182+
err := w.readToEnd(ctx)
183+
if err != nil {
184+
return err
185+
}
162186
}
163187

164188
// do nothing if the file hasn't changed size
189+
return nil
165190
}
166191

167-
func (w *FileWatcher) readToEnd(ctx context.Context) {
192+
func (w *FileWatcher) readToEnd(ctx context.Context) error {
168193
// TODO seek to last offset?
169194
scanner := bufio.NewScanner(w.file)
170195
scanner.Split(w.splitFunc)
@@ -173,16 +198,13 @@ func (w *FileWatcher) readToEnd(ctx context.Context) {
173198
for {
174199
select {
175200
case <-ctx.Done():
176-
return // Stop reading if closed
201+
return nil // Stop reading if closed
177202
default:
178203
}
179204

180205
ok := scanner.Scan()
181206
if !ok {
182-
if err := scanner.Err(); err != nil {
183-
w.Warn("Failed to scan file", "error", err)
184-
}
185-
break
207+
return scanner.Err()
186208
}
187209

188210
message := scanner.Text()
@@ -194,13 +216,17 @@ func (w *FileWatcher) readToEnd(ctx context.Context) {
194216
},
195217
}
196218

197-
w.fileSource.Output(entry)
219+
err := w.output(entry)
220+
if err != nil {
221+
return fmt.Errorf("output entry: %s", err)
222+
}
198223

199-
var err error
224+
// TODO does this actually work how I think it does with the scanner?
225+
// I'm unsure if the scanner peeks ahead, or actually advances the reader
226+
// every time it tries to parse something. This needs to be tested
200227
w.offset, err = w.file.Seek(0, 1) // get current file offset
201228
if err != nil {
202-
w.Errorw("Failed to get current offset", "error", err)
203-
return
229+
return fmt.Errorf("get current offset: %s", err)
204230
}
205231
}
206232
}

0 commit comments

Comments
 (0)