Skip to content

Commit b013c1b

Browse files
committed
use ReadSlice api and do not re-allocate pending
1 parent caf0f09 commit b013c1b

File tree

1 file changed

+12
-17
lines changed
  • internal/component/loki/source/file/internal/tail

1 file changed

+12
-17
lines changed

internal/component/loki/source/file/internal/tail/reader.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"compress/bzip2"
77
"compress/gzip"
88
"compress/zlib"
9+
"errors"
910
"io"
1011
"os"
1112
"unsafe"
@@ -79,14 +80,9 @@ type reader struct {
7980
// next reads and returns the next complete line from the file.
8081
// It will return EOF if there is no more data to read.
8182
func (r *reader) next() (string, error) {
82-
// First we check if we already have a full line buffered.
83-
if line, ok := r.consumeLine(); ok {
84-
return r.decode(line)
85-
}
86-
8783
for {
8884
// Read more data up until the last byte of nl.
89-
chunk, err := r.br.ReadBytes(r.lastNl)
85+
chunk, err := r.br.ReadSlice(r.lastNl)
9086
if len(chunk) > 0 {
9187
r.pending = append(r.pending, chunk...)
9288

@@ -95,13 +91,12 @@ func (r *reader) next() (string, error) {
9591
}
9692
}
9793

98-
// If we did not get an error and did not find a full line we
99-
// need to read more data.
100-
if err == nil {
101-
continue
102-
}
94+
// ReadSlice does not allocate; it returns a slice into bufio's buffer and advances
95+
// the read position. If we did not find a full line or got ErrBufferFull, loop and call again.
96+
if err != nil && !errors.Is(err, bufio.ErrBufferFull) {
97+
return "", err
10398

104-
return "", err
99+
}
105100
}
106101
}
107102

@@ -115,7 +110,7 @@ func (r *reader) flush() (string, error) {
115110

116111
line := r.pending[:]
117112
r.pos += int64(len(line))
118-
r.pending = make([]byte, 0, defaultBufSize)
113+
r.pending = r.pending[:0]
119114
return r.decode(bytes.TrimSuffix(line, r.nl))
120115
}
121116

@@ -142,9 +137,9 @@ func (r *reader) consumeLine() ([]byte, bool) {
142137

143138
// Extract everything up until newline.
144139
line := r.pending[:i]
145-
// Keep everything except the line we extracted and newline.
146-
rem := r.pending[i+len(r.nl):]
147-
r.pending = append(make([]byte, 0, defaultBufSize), rem...)
140+
141+
// Reset pending. We never buffer beyond newline so it is safe to reset.
142+
r.pending = r.pending[:0]
148143

149144
// Advance the position on bytes we have consumed as a full line.
150145
r.pos += int64(len(line) + len(r.nl))
@@ -170,7 +165,7 @@ func (r *reader) reset(f *os.File, offset int64) error {
170165
if offset != 0 {
171166
rr, err = newReaderAt(f, r.compression, offset)
172167
if err != nil {
173-
return nil
168+
return err
174169
}
175170
r.br.Reset(rr)
176171
}

0 commit comments

Comments
 (0)