Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions internal/component/loki/source/file/internal/tail/file_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tail

import (
"bytes"
"compress/gzip"
"compress/zlib"
"context"
Expand Down Expand Up @@ -472,9 +473,9 @@ func compressionTest(t *testing.T, name, compression string, enc *encoding.Encod
})
}

func createFile(t *testing.T, name, content string) string {
path := t.TempDir() + "/" + name
require.NoError(t, os.WriteFile(path, []byte(content), 0600))
func createFile(tb testing.TB, name, content string) string {
path := tb.TempDir() + "/" + name
require.NoError(tb, os.WriteFile(path, []byte(content), 0600))
return path
}

Expand All @@ -494,8 +495,8 @@ func truncateFile(t *testing.T, name, content string) {
require.NoError(t, err)
}

func removeFile(t *testing.T, name string) {
require.NoError(t, os.Remove(name))
func removeFile(tb testing.TB, name string) {
require.NoError(tb, os.Remove(name))
}

func rotateFile(t *testing.T, name, newContent string) {
Expand Down Expand Up @@ -548,3 +549,33 @@ func verifyResult(t *testing.T, f *File, expectedLine *Line, expectedErr error)
require.Equal(t, expectedLine.Offset, line.Offset)
}
}

var benchLine *Line

func BenchmarkFile(b *testing.B) {
// we create a file with 1000 lines and each line is 500 bytes
line := bytes.Repeat([]byte{'a'}, 500)
lines := strings.Repeat(string(line)+"\n", 1000)
name := createFile(b, "benchfile", lines)
defer removeFile(b, name)

b.ReportAllocs()

for b.Loop() {
file, err := NewFile(log.NewNopLogger(), &Config{
Filename: name,
WatcherConfig: WatcherConfig{},
})
require.NoError(b, err)
// we set EOF here so tailer will stop after we have consumed the whole file
file.waitAtEOF = false

for {
var err error
benchLine, err = file.Next()
if errors.Is(err, io.EOF) {
break
}
}
}
}
27 changes: 11 additions & 16 deletions internal/component/loki/source/file/internal/tail/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"compress/bzip2"
"compress/gzip"
"compress/zlib"
"errors"
"io"
"os"
"unsafe"
Expand Down Expand Up @@ -79,14 +80,9 @@ type reader struct {
// next reads and returns the next complete line from the file.
// It will return EOF if there is no more data to read.
func (r *reader) next() (string, error) {
// First we check if we already have a full line buffered.
if line, ok := r.consumeLine(); ok {
return r.decode(line)
}

for {
// Read more data up until the last byte of nl.
chunk, err := r.br.ReadBytes(r.lastNl)
chunk, err := r.br.ReadSlice(r.lastNl)
if len(chunk) > 0 {
r.pending = append(r.pending, chunk...)

Expand All @@ -95,13 +91,12 @@ func (r *reader) next() (string, error) {
}
}

// If we did not get an error and did not find a full line we
// need to read more data.
if err == nil {
continue
}
// ReadSlice does not allocate; it returns a slice into bufio's buffer and advances
// the read position. If we did not find a full line or got ErrBufferFull, loop and call again.
if err != nil && !errors.Is(err, bufio.ErrBufferFull) {
return "", err

return "", err
}
}
}

Expand All @@ -115,7 +110,7 @@ func (r *reader) flush() (string, error) {

line := r.pending[:]
r.pos += int64(len(line))
r.pending = make([]byte, 0, defaultBufSize)
r.pending = r.pending[:0]
return r.decode(bytes.TrimSuffix(line, r.nl))
}

Expand All @@ -142,9 +137,9 @@ func (r *reader) consumeLine() ([]byte, bool) {

// Extract everything up until newline.
line := r.pending[:i]
// Keep everything except the line we extracted and newline.
rem := r.pending[i+len(r.nl):]
r.pending = append(make([]byte, 0, defaultBufSize), rem...)

// Reset pending. We never buffer beyond newline so it is safe to reset.
r.pending = r.pending[:0]

// Advance the position on bytes we have consumed as a full line.
r.pos += int64(len(line) + len(r.nl))
Expand Down
Loading