Skip to content

Commit 6d29289

Browse files
author
Joseph Sirianni
authored
Track rotated (#346)
* reorg test files only, ported from open-telemetry/opentelemetry-log-collection#165 * port otel enhanced file input benchmarking open-telemetry/opentelemetry-log-collection#166 * skip closing files that are already closed * port otel file rotation tracking open-telemetry/opentelemetry-log-collection#182 * fix poll()
1 parent b77fa58 commit 6d29289

File tree

8 files changed

+128
-37
lines changed

8 files changed

+128
-37
lines changed

docs/operators/file_input.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ The `file_input` operator reads logs from files. It will place the lines read in
1919
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
2020
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
2121
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
22-
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
22+
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
2323
| `labels` | {} | A map of `key: value` labels to add to the entry's labels |
2424
| `resource` | {} | A map of `key: value` labels to add to the entry's resource |
2525

@@ -35,6 +35,13 @@ If set, the `multiline` configuration block instructs the `file_input` operator
3535
The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
3636
match either the beginning of a new log entry, or the end of a log entry.
3737

38+
Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.
39+
40+
### File rotation
41+
42+
When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss.
43+
To avoid the data loss, choose move/create rotation method and set `max_concurrent_files` higher than the twice of the number of files to tail.
44+
3845
### Supported encodings
3946

4047
| Key | Description

operator/builtin/input/file/benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func BenchmarkFileInput(b *testing.B) {
9090
cfg.Include = []string{
9191
"file*.log",
9292
}
93-
cfg.MaxConcurrentFiles = 1
93+
cfg.MaxConcurrentFiles = 2
9494
return cfg
9595
},
9696
},

operator/builtin/input/file/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
8383
return nil, fmt.Errorf("`max_log_size` must be positive")
8484
}
8585

86-
if c.MaxConcurrentFiles <= 0 {
87-
return nil, fmt.Errorf("`max_concurrent_files` must be positive")
86+
if c.MaxConcurrentFiles <= 1 {
87+
return nil, fmt.Errorf("`max_concurrent_files` must be greater than 1")
8888
}
8989

9090
if c.FingerprintSize == 0 {

operator/builtin/input/file/file.go

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ type InputOperator struct {
3333

3434
persist helper.Persister
3535

36-
knownFiles []*Reader
37-
queuedMatches []string
36+
knownFiles []*Reader
37+
queuedMatches []string
38+
maxBatchFiles int
39+
lastPollReaders []*Reader
3840

3941
startAtBeginning bool
4042

@@ -69,6 +71,12 @@ func (f *InputOperator) Start() error {
6971
func (f *InputOperator) Stop() error {
7072
f.cancel()
7173
f.wg.Wait()
74+
for _, reader := range f.lastPollReaders {
75+
reader.Close()
76+
}
77+
for _, reader := range f.knownFiles {
78+
reader.Close()
79+
}
7280
f.knownFiles = nil
7381
f.cancel = nil
7482
return nil
@@ -97,10 +105,10 @@ func (f *InputOperator) startPoller(ctx context.Context) {
97105

98106
// poll checks all the watched paths for new entries
99107
func (f *InputOperator) poll(ctx context.Context) {
100-
108+
f.maxBatchFiles = f.MaxConcurrentFiles / 2
101109
var matches []string
102-
if len(f.queuedMatches) > f.MaxConcurrentFiles {
103-
matches, f.queuedMatches = f.queuedMatches[:f.MaxConcurrentFiles], f.queuedMatches[f.MaxConcurrentFiles:]
110+
if len(f.queuedMatches) > f.maxBatchFiles {
111+
matches, f.queuedMatches = f.queuedMatches[:f.maxBatchFiles], f.queuedMatches[f.maxBatchFiles:]
104112
} else if len(f.queuedMatches) > 0 {
105113
matches, f.queuedMatches = f.queuedMatches, make([]string, 0)
106114
} else {
@@ -114,15 +122,35 @@ func (f *InputOperator) poll(ctx context.Context) {
114122
matches = getMatches(f.Include, f.Exclude)
115123
if f.firstCheck && len(matches) == 0 {
116124
f.Warnw("no files match the configured include patterns", "include", f.Include)
117-
} else if len(matches) > f.MaxConcurrentFiles {
118-
matches, f.queuedMatches = matches[:f.MaxConcurrentFiles], matches[f.MaxConcurrentFiles:]
125+
} else if len(matches) > f.maxBatchFiles {
126+
matches, f.queuedMatches = matches[:f.maxBatchFiles], matches[f.maxBatchFiles:]
119127
}
120128
}
121129

122130
readers := f.makeReaders(matches)
123131
f.firstCheck = false
124132

133+
// Detect files that have been rotated out of matching pattern
134+
lostReaders := make([]*Reader, 0, len(f.lastPollReaders))
135+
OUTER:
136+
for _, oldReader := range f.lastPollReaders {
137+
for _, reader := range readers {
138+
if reader.Fingerprint.StartsWith(oldReader.Fingerprint) {
139+
continue OUTER
140+
}
141+
}
142+
lostReaders = append(lostReaders, oldReader)
143+
}
144+
125145
var wg sync.WaitGroup
146+
for _, reader := range lostReaders {
147+
wg.Add(1)
148+
go func(r *Reader) {
149+
defer wg.Done()
150+
r.ReadToEnd(ctx)
151+
}(reader)
152+
}
153+
126154
for _, reader := range readers {
127155
wg.Add(1)
128156
go func(r *Reader) {
@@ -134,6 +162,13 @@ func (f *InputOperator) poll(ctx context.Context) {
134162
// Wait until all the reader goroutines are finished
135163
wg.Wait()
136164

165+
// Close all files
166+
for _, reader := range f.lastPollReaders {
167+
reader.Close()
168+
}
169+
170+
f.lastPollReaders = readers
171+
137172
f.saveCurrent(readers)
138173
f.syncLastPollFiles()
139174
}
@@ -200,7 +235,7 @@ func (f *InputOperator) makeReaders(filePaths []string) []*Reader {
200235

201236
// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
202237
OUTER:
203-
for i := 0; i < len(fps); {
238+
for i := 0; i < len(fps); i++ {
204239
fp := fps[i]
205240
if len(fp.FirstBytes) == 0 {
206241
files[i].Close()
@@ -210,12 +245,7 @@ OUTER:
210245

211246
}
212247

213-
for j := 0; j < len(fps); j++ {
214-
if i == j {
215-
// Skip checking itself
216-
continue
217-
}
218-
248+
for j := i + 1; j < len(fps); j++ {
219249
fp2 := fps[j]
220250
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
221251
// Exclude
@@ -224,7 +254,6 @@ OUTER:
224254
continue OUTER
225255
}
226256
}
227-
i++
228257
}
229258

230259
readers := make([]*Reader, 0, len(fps))

operator/builtin/input/file/file_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ func TestReadNewLogs(t *testing.T) {
9494
func TestReadExistingAndNewLogs(t *testing.T) {
9595
t.Parallel()
9696
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
97+
defer operator.Stop()
9798

9899
// Start with a file with an entry in it, and expect that entry
99100
// to come through when we poll for the first time
@@ -116,6 +117,7 @@ func TestStartAtEnd(t *testing.T) {
116117
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
117118
cfg.StartAt = "end"
118119
}, nil)
120+
defer operator.Stop()
119121

120122
temp := openTemp(t, tempDir)
121123
writeString(t, temp, "testlog1\n")
@@ -137,6 +139,7 @@ func TestStartAtEndNewFile(t *testing.T) {
137139
t.Parallel()
138140
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
139141
operator.startAtBeginning = false
142+
defer operator.Stop()
140143

141144
operator.poll(context.Background())
142145

@@ -185,6 +188,7 @@ func TestSkipEmpty(t *testing.T) {
185188
func TestSplitWrite(t *testing.T) {
186189
t.Parallel()
187190
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
191+
defer operator.Stop()
188192

189193
temp := openTemp(t, tempDir)
190194
writeString(t, temp, "testlog1")
@@ -406,10 +410,11 @@ func TestFileBatching(t *testing.T) {
406410

407411
files := 100
408412
linesPerFile := 10
409-
maxConcurrentFiles := 10
413+
maxConcurrentFiles := 20
414+
maxBatchFiles := maxConcurrentFiles / 2
410415

411-
expectedBatches := files / maxConcurrentFiles // assumes no remainder
412-
expectedLinesPerBatch := maxConcurrentFiles * linesPerFile
416+
expectedBatches := files / maxBatchFiles // assumes no remainder
417+
expectedLinesPerBatch := maxBatchFiles * linesPerFile
413418

414419
expectedMessages := make([]string, 0, files*linesPerFile)
415420
actualMessages := make([]string, 0, files*linesPerFile)
@@ -419,9 +424,10 @@ func TestFileBatching(t *testing.T) {
419424
cfg.MaxConcurrentFiles = maxConcurrentFiles
420425
},
421426
func(out *testutil.FakeOutput) {
422-
out.Received = make(chan *entry.Entry, expectedLinesPerBatch)
427+
out.Received = make(chan *entry.Entry, expectedLinesPerBatch*2)
423428
},
424429
)
430+
defer operator.Stop()
425431

426432
temps := make([]*os.File, 0, files)
427433
for i := 0; i < files; i++ {
@@ -441,7 +447,6 @@ func TestFileBatching(t *testing.T) {
441447
// poll once so we can validate that files were batched
442448
operator.poll(context.Background())
443449
actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...)
444-
expectNoMessagesUntil(t, logReceived, 10*time.Millisecond)
445450
}
446451

447452
require.ElementsMatch(t, expectedMessages, actualMessages)
@@ -459,7 +464,6 @@ func TestFileBatching(t *testing.T) {
459464
// poll once so we can validate that files were batched
460465
operator.poll(context.Background())
461466
actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...)
462-
expectNoMessagesUntil(t, logReceived, 10*time.Millisecond)
463467
}
464468

465469
require.ElementsMatch(t, expectedMessages, actualMessages)
@@ -469,13 +473,15 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
469473
t.Parallel()
470474

471475
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
476+
defer operator.Stop()
472477

473478
temp := openTemp(t, tempDir)
474479
tempCopy := openFile(t, temp.Name())
475480
fp, err := operator.NewFingerprint(temp)
476481
require.NoError(t, err)
477482
reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
478483
require.NoError(t, err)
484+
defer reader.Close()
479485

480486
writeString(t, temp, "testlog1\n")
481487
reader.ReadToEnd(context.Background())

operator/builtin/input/file/reader.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error {
6969

7070
// ReadToEnd will read until the end of the file
7171
func (f *Reader) ReadToEnd(ctx context.Context) {
72-
defer f.file.Close()
73-
7472
if _, err := f.file.Seek(f.Offset, 0); err != nil {
7573
f.Errorw("Failed to seek", zap.Error(err))
7674
return
@@ -103,8 +101,12 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
103101
}
104102

105103
// Close will close the file
106-
func (f *Reader) Close() error {
107-
return f.file.Close()
104+
func (f *Reader) Close() {
105+
if f.file != nil {
106+
if err := f.file.Close(); err != nil {
107+
f.Debugf("Problem closing reader", "Error", err.Error())
108+
}
109+
}
108110
}
109111

110112
// Emit creates an entry with the decoded message and sends it to the next

operator/builtin/input/file/rotation_test.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@ import (
1717
"github.com/stretchr/testify/require"
1818
)
1919

20+
const WINDOWS_OS = "windows"
21+
2022
func TestMultiFileRotate(t *testing.T) {
23+
if runtime.GOOS == WINDOWS_OS {
24+
// Windows has very poor support for moving active files, so rotation is less commonly used
25+
// This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358
26+
t.Skip()
27+
}
28+
2129
t.Parallel()
2230

2331
getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) }
@@ -67,7 +75,7 @@ func TestMultiFileRotate(t *testing.T) {
6775
}
6876

6977
func TestMultiFileRotateSlow(t *testing.T) {
70-
if runtime.GOOS == "windows" {
78+
if runtime.GOOS == WINDOWS_OS {
7179
// Windows has very poor support for moving active files, so rotation is less commonly used
7280
// This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358
7381
t.Skip()
@@ -340,15 +348,19 @@ func TestRotation(t *testing.T) {
340348
}
341349

342350
for _, tc := range cases {
343-
t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false))
344-
t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true))
351+
if runtime.GOOS != WINDOWS_OS {
352+
// Windows has very poor support for moving active files, so rotation is less commonly used
353+
// This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358
354+
t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false))
355+
t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true))
356+
}
345357
t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false))
346358
t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true))
347359
}
348360
}
349361

350362
func TestMoveFile(t *testing.T) {
351-
if runtime.GOOS == "windows" {
363+
if runtime.GOOS == WINDOWS_OS {
352364
t.Skip("Moving files while open is unsupported on Windows")
353365
}
354366
t.Parallel()
@@ -372,6 +384,41 @@ func TestMoveFile(t *testing.T) {
372384
expectNoMessages(t, logReceived)
373385
}
374386

387+
func TestTrackMovedAwayFiles(t *testing.T) {
388+
if runtime.GOOS == WINDOWS_OS {
389+
t.Skip("Moving files while open is unsupported on Windows")
390+
}
391+
t.Parallel()
392+
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
393+
394+
temp1 := openTemp(t, tempDir)
395+
writeString(t, temp1, "testlog1\n")
396+
temp1.Close()
397+
398+
operator.poll(context.Background())
399+
defer operator.Stop()
400+
401+
waitForMessage(t, logReceived, "testlog1")
402+
403+
// Wait until all goroutines are finished before renaming
404+
operator.wg.Wait()
405+
406+
newDir := fmt.Sprintf("%s%s", tempDir[:len(tempDir)-1], "_new/")
407+
err := os.Mkdir(newDir, 0777)
408+
require.NoError(t, err)
409+
newFileName := fmt.Sprintf("%s%s", newDir, "newfile.log")
410+
411+
err = os.Rename(temp1.Name(), newFileName)
412+
require.NoError(t, err)
413+
414+
movedFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
415+
require.NoError(t, err)
416+
writeString(t, movedFile, "testlog2\n")
417+
operator.poll(context.Background())
418+
419+
waitForMessage(t, logReceived, "testlog2")
420+
}
421+
375422
// TruncateThenWrite tests that, after a file has been truncated,
376423
// any new writes are picked up
377424
func TestTruncateThenWrite(t *testing.T) {

0 commit comments

Comments
 (0)