Skip to content

Commit 55a82f0

Browse files
fix(loki.source.docker): Parse timestamp correctly when log line only contains newline [backport] (#5496)
## Backport of #5489 This PR backports #5489 to release/v1.13. ### Original PR Author @kalleep ### Description ### Pull Request Details Regression from #4713. Before that pr we parsed log lines that only included newlines. In this pr I fixed the issue so we no longer report an error but we keep the behavior of not forwarding empty lines and add a debug log for these cases. ### Issue(s) fixed by this Pull Request Fixes: #5476 ### Notes to the Reviewer <!-- Add any relevant notes for the reviewers and testers of this PR. --> ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [x] Tests updated - [ ] Config converters updated --- *This backport was created automatically.* Co-authored-by: Karl Persson <23356117+kalleep@users.noreply.github.com>
1 parent 2bbc37e commit 55a82f0

File tree

2 files changed

+87
-10
lines changed

2 files changed

+87
-10
lines changed

internal/component/loki/source/docker/tailer.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -242,19 +242,15 @@ func (t *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser
242242

243243
// extractTsFromBytes parses an RFC3339Nano timestamp from the byte slice.
244244
func extractTsFromBytes(line []byte) (time.Time, []byte, error) {
245-
const timestampLayout = "2006-01-02T15:04:05.999999999Z07:00"
246-
247245
spaceIdx := bytes.IndexByte(line, ' ')
248-
if spaceIdx == -1 || spaceIdx >= len(line)-1 {
246+
if spaceIdx == -1 || spaceIdx >= len(line) {
249247
return time.Time{}, nil, fmt.Errorf("could not find timestamp in bytes")
250248
}
251249

252-
// The unsafe.String is used here to avoid allocation and string conversion when parsing the timestamp
253-
// This is safe because:
254-
// 1. spaceIdx > 0 and spaceIdx < len(line)-1 is guaranteed by the check above
255-
// 2. time.Parse doesn't retain the string after returning
256-
// 3. The underlying bytes aren't modified during parsing
257-
ts, err := time.Parse(timestampLayout, unsafe.String(&line[0], spaceIdx))
250+
// The unsafe.String is used here to avoid allocation and string conversion when parsing the timestamp.
251+
// This is safe because time.Parse doesn't retain the string after returning and
252+
// the underlying bytes aren't modified during parsing.
253+
ts, err := time.Parse(time.RFC3339Nano, unsafe.String(&line[0], spaceIdx))
258254
if err != nil {
259255
return time.Time{}, nil, fmt.Errorf("could not parse timestamp: %w", err)
260256
}
@@ -264,8 +260,9 @@ func extractTsFromBytes(line []byte) (time.Time, []byte, error) {
264260
func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) {
265261
defer t.wg.Done()
266262

267-
scanner := bufio.NewScanner(r)
268263
const maxCapacity = dockerMaxChunkSize * 64
264+
265+
scanner := bufio.NewScanner(r)
269266
buf := make([]byte, 0, maxCapacity)
270267
scanner.Buffer(buf, maxCapacity)
271268
for scanner.Scan() {
@@ -278,6 +275,11 @@ func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) {
278275
continue
279276
}
280277

278+
if len(content) == 0 {
279+
level.Debug(t.logger).Log("msg", "empty log, skipping line")
280+
continue
281+
}
282+
281283
t.recv.Chan() <- loki.Entry{
282284
Labels: logStreamLset,
283285
Entry: push.Entry{

internal/component/loki/source/docker/tailer_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/docker/docker/api/types/container"
2121
"github.com/docker/docker/client"
22+
"github.com/docker/docker/pkg/stdcopy"
2223
"github.com/go-kit/log"
2324
"github.com/prometheus/client_golang/prometheus"
2425
"github.com/prometheus/common/model"
@@ -216,6 +217,61 @@ func TestTailerNeverStarted(t *testing.T) {
216217
require.NotPanics(t, func() { cancel() })
217218
}
218219

220+
var _ io.ReadCloser = (*stringReader)(nil)
221+
222+
func newStringReader(s string) *stringReader {
223+
return &stringReader{Reader: strings.NewReader(s)}
224+
}
225+
226+
type stringReader struct {
227+
*strings.Reader
228+
}
229+
230+
func (s *stringReader) Close() error {
231+
return nil
232+
}
233+
234+
func TestTailerConsumeLines(t *testing.T) {
235+
t.Run("skip empty line", func(t *testing.T) {
236+
collector := loki.NewCollectingHandler()
237+
tailer := &tailer{
238+
logger: log.NewNopLogger(),
239+
recv: collector.Receiver(),
240+
positions: positions.NewNop(),
241+
containerID: "test",
242+
metrics: newMetrics(prometheus.DefaultRegisterer),
243+
running: true,
244+
wg: sync.WaitGroup{},
245+
last: atomic.NewInt64(0),
246+
since: atomic.NewInt64(0),
247+
componentStopping: func() bool { return false },
248+
}
249+
250+
bb := &bytes.Buffer{}
251+
writer := stdcopy.NewStdWriter(bb, stdcopy.Stdout)
252+
_, err := writer.Write([]byte("2023-12-09T12:00:00.000000000Z \n2023-12-09T12:00:00.000000000Z line\n"))
253+
require.NoError(t, err)
254+
255+
tailer.wg.Add(3)
256+
go func() {
257+
tailer.processLoop(t.Context(), false, newStringReader(bb.String()))
258+
}()
259+
260+
require.Eventually(t, func() bool {
261+
return len(collector.Received()) == 1
262+
}, 2*time.Second, 50*time.Millisecond)
263+
264+
entry := collector.Received()[0]
265+
266+
expectedLine := "line"
267+
expectedTimestamp, err := time.Parse(time.RFC3339Nano, "2023-12-09T12:00:00.000000000Z")
268+
require.NoError(t, err)
269+
270+
require.Equal(t, expectedLine, entry.Line)
271+
require.Equal(t, expectedTimestamp, entry.Timestamp)
272+
})
273+
}
274+
219275
func TestChunkWriter(t *testing.T) {
220276
logger := log.NewNopLogger()
221277
var buf bytes.Buffer
@@ -254,6 +310,25 @@ func TestChunkWriter(t *testing.T) {
254310
assert.Equal(t, expected, buf.Bytes())
255311
}
256312

313+
func TestExtractTsFromBytes(t *testing.T) {
314+
t.Run("invalid timestamp", func(t *testing.T) {
315+
_, _, err := extractTsFromBytes([]byte("123 test\n"))
316+
require.Error(t, err)
317+
})
318+
319+
t.Run("valid timestamp empty line", func(t *testing.T) {
320+
ts, _, err := extractTsFromBytes([]byte("2024-05-02T13:11:55.879889Z \n"))
321+
require.NoError(t, err)
322+
expectedTs, err := time.Parse(time.RFC3339Nano, "2024-05-02T13:11:55.879889Z")
323+
require.NoError(t, err)
324+
require.Equal(t, expectedTs, ts)
325+
})
326+
t.Run("valid timestamp no space", func(t *testing.T) {
327+
_, _, err := extractTsFromBytes([]byte("2024-05-02T13:11:55.879889Z\n"))
328+
require.Error(t, err)
329+
})
330+
}
331+
257332
func newDockerServer(t *testing.T) *httptest.Server {
258333
h := func(w http.ResponseWriter, r *http.Request) {
259334
path := r.URL.Path

0 commit comments

Comments
 (0)