Skip to content

Commit eca2294

Browse files
perf(loki.source.file): Reduce allocations during file reading [backport] (#5416)
## Backport of #5405 This PR backports #5405 to release/v1.13. ### Original PR Author @kalleep ### Description ### Pull Request Details There is a lot of allocations being performed while reading files. I can clearly see this being a hot spot in our internal clusters. Two of the biggest issues are: 1. We allocate a new pending buffer as soon as we have completed a line 2. We use ReadBytes api causing new allocation for every call that we just move to pending Instead we can use `ReadSlice`, this api will not allocate but return a slice pointing to the internal buffer while still advancing read position. We copy it into pending so this is fine and we remove the intermediate allocation done by `ReadBytes`. We can also reset length of `pending` and this buffer. This is fine because we never read past newline boundary. ### Issue(s) fixed by this Pull Request <!-- Fixes #issue_id --> ### Notes to the Reviewer <!-- Add any relevant notes for the reviewers and testers of this PR. --> ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [x] Tests updated - [ ] Config converters updated --- *This backport was created automatically.* Co-authored-by: Karl Persson <23356117+kalleep@users.noreply.github.com>
1 parent 311662f commit eca2294

File tree

2 files changed

+50
-21
lines changed

2 files changed

+50
-21
lines changed

internal/component/loki/source/file/internal/tail/file_test.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tail
22

33
import (
4+
"bytes"
45
"compress/gzip"
56
"compress/zlib"
67
"context"
@@ -472,9 +473,9 @@ func compressionTest(t *testing.T, name, compression string, enc *encoding.Encod
472473
})
473474
}
474475

475-
func createFile(t *testing.T, name, content string) string {
476-
path := t.TempDir() + "/" + name
477-
require.NoError(t, os.WriteFile(path, []byte(content), 0600))
476+
func createFile(tb testing.TB, name, content string) string {
477+
path := tb.TempDir() + "/" + name
478+
require.NoError(tb, os.WriteFile(path, []byte(content), 0600))
478479
return path
479480
}
480481

@@ -494,8 +495,8 @@ func truncateFile(t *testing.T, name, content string) {
494495
require.NoError(t, err)
495496
}
496497

497-
func removeFile(t *testing.T, name string) {
498-
require.NoError(t, os.Remove(name))
498+
func removeFile(tb testing.TB, name string) {
499+
require.NoError(tb, os.Remove(name))
499500
}
500501

501502
func rotateFile(t *testing.T, name, newContent string) {
@@ -548,3 +549,36 @@ func verifyResult(t *testing.T, f *File, expectedLine *Line, expectedErr error)
548549
require.Equal(t, expectedLine.Offset, line.Offset)
549550
}
550551
}
552+
553+
var benchLine *Line
554+
555+
func BenchmarkFile(b *testing.B) {
556+
// we create a file with 1000 lines and each line is 500 bytes.
557+
line := bytes.Repeat([]byte{'a'}, 500)
558+
lines := strings.Repeat(string(line)+"\n", 1000)
559+
name := createFile(b, "benchfile", lines)
560+
defer removeFile(b, name)
561+
562+
b.ReportAllocs()
563+
564+
for b.Loop() {
565+
file, err := NewFile(log.NewNopLogger(), &Config{
566+
Filename: name,
567+
WatcherConfig: WatcherConfig{},
568+
})
569+
require.NoError(b, err)
570+
// Disable waiting at EOF so Next returns io.EOF after the file is fully consumed.
571+
file.waitAtEOF = false
572+
573+
for {
574+
var err error
575+
benchLine, err = file.Next()
576+
if errors.Is(err, io.EOF) {
577+
break
578+
}
579+
require.NoError(b, err)
580+
}
581+
582+
file.Stop()
583+
}
584+
}

internal/component/loki/source/file/internal/tail/reader.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"compress/bzip2"
77
"compress/gzip"
88
"compress/zlib"
9+
"errors"
910
"io"
1011
"os"
1112
"unsafe"
@@ -79,14 +80,9 @@ type reader struct {
7980
// next reads and returns the next complete line from the file.
8081
// It will return EOF if there is no more data to read.
8182
func (r *reader) next() (string, error) {
82-
// First we check if we already have a full line buffered.
83-
if line, ok := r.consumeLine(); ok {
84-
return r.decode(line)
85-
}
86-
8783
for {
8884
// Read more data up until the last byte of nl.
89-
chunk, err := r.br.ReadBytes(r.lastNl)
85+
chunk, err := r.br.ReadSlice(r.lastNl)
9086
if len(chunk) > 0 {
9187
r.pending = append(r.pending, chunk...)
9288

@@ -95,13 +91,12 @@ func (r *reader) next() (string, error) {
9591
}
9692
}
9793

98-
// If we did not get an error and did not find a full line we
99-
// need to read more data.
100-
if err == nil {
101-
continue
102-
}
94+
// ReadSlice does not allocate; it returns a slice into bufio's buffer and advances
95+
// the read position. If we did not find a full line or got ErrBufferFull, loop and call again.
96+
if err != nil && !errors.Is(err, bufio.ErrBufferFull) {
97+
return "", err
10398

104-
return "", err
99+
}
105100
}
106101
}
107102

@@ -115,7 +110,7 @@ func (r *reader) flush() (string, error) {
115110

116111
line := r.pending[:]
117112
r.pos += int64(len(line))
118-
r.pending = make([]byte, 0, defaultBufSize)
113+
r.pending = r.pending[:0]
119114
return r.decode(bytes.TrimSuffix(line, r.nl))
120115
}
121116

@@ -142,9 +137,9 @@ func (r *reader) consumeLine() ([]byte, bool) {
142137

143138
// Extract everything up until newline.
144139
line := r.pending[:i]
145-
// Keep everything except the line we extracted and newline.
146-
rem := r.pending[i+len(r.nl):]
147-
r.pending = append(make([]byte, 0, defaultBufSize), rem...)
140+
141+
// Reset pending. We never buffer beyond newline so it is safe to reset.
142+
r.pending = r.pending[:0]
148143

149144
// Advance the position on bytes we have consumed as a full line.
150145
r.pos += int64(len(line) + len(r.nl))

0 commit comments

Comments
 (0)