Skip to content

Commit cccbda7

Browse files
authored
Fix handling of multiplexed long lines in Docker logs (#4713)
* Improve docker's multiplexed long lines handling * Fix `dockerChunkWriter` buffer flushing and add test for it * Update `CHANGELOG.md` * Fix potential panic in `extractTsFromBytes` and `dockerChunkWriter` buffer allocation
1 parent b89f510 commit cccbda7

File tree

3 files changed

+142
-41
lines changed

3 files changed

+142
-41
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ Main (unreleased)
5858

5959
- Remove extraneous `output` stage from the `cri` stage pipeline in `loki.process`. (@kalleep)
6060

61+
- Fix Docker log corruption for multiplexed long lines. (@axd1x8a)
62+
6163
v1.12.0
6264
-----------------
6365

internal/component/loki/source/docker/internal/dockertarget/target.go

Lines changed: 101 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ package dockertarget
66

77
import (
88
"bufio"
9+
"bytes"
910
"context"
1011
"fmt"
1112
"io"
1213
"strconv"
1314
"strings"
1415
"sync"
1516
"time"
17+
"unsafe"
1618

1719
"github.com/docker/docker/api/types/container"
1820
"github.com/docker/docker/client"
@@ -34,6 +36,7 @@ const (
3436
dockerLabel = model.MetaLabelPrefix + "docker_"
3537
dockerLabelContainerPrefix = dockerLabel + "container_"
3638
dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream"
39+
dockerMaxChunkSize = 16384
3740
)
3841

3942
// Target enables reading Docker container logs.
@@ -110,7 +113,12 @@ func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser
110113
if tty {
111114
written, err = io.Copy(wstdout, reader)
112115
} else {
113-
written, err = stdcopy.StdCopy(wstdout, wstderr, reader)
116+
// For non-TTY, wrap the pipe writers with our chunk writer to reassemble frames.
117+
wcstdout := newChunkWriter(wstdout, t.logger)
118+
defer wcstdout.Close()
119+
wcstderr := newChunkWriter(wstderr, t.logger)
120+
defer wcstderr.Close()
121+
written, err = stdcopy.StdCopy(wcstdout, wcstderr, reader)
114122
}
115123
if err != nil {
116124
level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err)
@@ -128,53 +136,38 @@ func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser
128136
level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName)
129137
}
130138

131-
// extractTs tries for read the timestamp from the beginning of the log line.
132-
// It's expected to follow the format 2006-01-02T15:04:05.999999999Z07:00.
133-
func extractTs(line string) (time.Time, string, error) {
134-
pair := strings.SplitN(line, " ", 2)
135-
if len(pair) != 2 {
136-
return time.Now(), line, fmt.Errorf("could not find timestamp in '%s'", line)
137-
}
138-
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0])
139-
if err != nil {
140-
return time.Now(), line, fmt.Errorf("could not parse timestamp from '%s': %w", pair[0], err)
141-
}
142-
return ts, pair[1], nil
143-
}
139+
// extractTsFromBytes parses an RFC3339Nano timestamp from the byte slice.
140+
func extractTsFromBytes(line []byte) (time.Time, []byte, error) {
141+
const timestampLayout = "2006-01-02T15:04:05.999999999Z07:00"
144142

145-
// https://devmarkpro.com/working-big-files-golang
146-
func readLine(r *bufio.Reader) (string, error) {
147-
var (
148-
isPrefix = true
149-
err error
150-
line, ln []byte
151-
)
152-
153-
for isPrefix && err == nil {
154-
line, isPrefix, err = r.ReadLine()
155-
ln = append(ln, line...)
143+
spaceIdx := bytes.IndexByte(line, ' ')
144+
if spaceIdx == -1 || spaceIdx >= len(line)-1 {
145+
return time.Time{}, nil, fmt.Errorf("could not find timestamp in bytes")
156146
}
157147

158-
return string(ln), err
148+
// The unsafe.String is used here to avoid allocation and string conversion when parsing the timestamp
149+
// This is safe because:
150+
// 1. spaceIdx > 0 and spaceIdx < len(line)-1 is guaranteed by the check above
151+
// 2. time.Parse doesn't retain the string after returning
152+
// 3. The underlying bytes aren't modified during parsing
153+
ts, err := time.Parse(timestampLayout, unsafe.String(&line[0], spaceIdx))
154+
if err != nil {
155+
return time.Time{}, nil, fmt.Errorf("could not parse timestamp: %w", err)
156+
}
157+
return ts, line[spaceIdx+1:], nil
159158
}
160159

161160
func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) {
162-
defer func() {
163-
t.wg.Done()
164-
}()
161+
defer t.wg.Done()
165162

166-
reader := bufio.NewReader(r)
167-
for {
168-
line, err := readLine(reader)
169-
if err != nil {
170-
if err == io.EOF {
171-
break
172-
}
173-
level.Error(t.logger).Log("msg", "error reading docker log line, skipping line", "err", err)
174-
t.metrics.dockerErrors.Inc()
175-
}
163+
scanner := bufio.NewScanner(r)
164+
const maxCapacity = dockerMaxChunkSize * 64
165+
buf := make([]byte, 0, maxCapacity)
166+
scanner.Buffer(buf, maxCapacity)
167+
for scanner.Scan() {
168+
line := scanner.Bytes()
176169

177-
ts, line, err := extractTs(line)
170+
ts, content, err := extractTsFromBytes(line)
178171
if err != nil {
179172
level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err)
180173
t.metrics.dockerErrors.Inc()
@@ -185,7 +178,7 @@ func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) {
185178
Labels: logStreamLset,
186179
Entry: push.Entry{
187180
Timestamp: ts,
188-
Line: line,
181+
Line: string(content),
189182
},
190183
}
191184
t.metrics.dockerEntries.Inc()
@@ -200,6 +193,10 @@ func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) {
200193
t.since.Store(ts.Unix())
201194
t.last.Store(time.Now().Unix())
202195
}
196+
if err := scanner.Err(); err != nil {
197+
level.Error(t.logger).Log("msg", "error reading docker log line", "err", err)
198+
t.metrics.dockerErrors.Inc()
199+
}
203200
}
204201

205202
// StartIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice.
@@ -321,3 +318,66 @@ func (t *Target) getStreamLabels(logStream string) model.LabelSet {
321318

322319
return filtered
323320
}
321+
322+
// dockerChunkWriter implements io.Writer to preprocess and reassemble Docker log frames.
323+
type dockerChunkWriter struct {
324+
writer io.Writer
325+
logger log.Logger
326+
buffer *bytes.Buffer
327+
isBuffering bool
328+
}
329+
330+
var bufferPool = sync.Pool{
331+
New: func() interface{} {
332+
return bytes.NewBuffer(make([]byte, 0, dockerMaxChunkSize*2))
333+
},
334+
}
335+
336+
func newChunkWriter(writer io.Writer, logger log.Logger) *dockerChunkWriter {
337+
return &dockerChunkWriter{
338+
writer: writer,
339+
logger: logger,
340+
buffer: bufferPool.Get().(*bytes.Buffer),
341+
}
342+
}
343+
344+
func (fw *dockerChunkWriter) Close() error {
345+
if fw.buffer != nil {
346+
fw.buffer.Reset()
347+
bufferPool.Put(fw.buffer)
348+
fw.buffer = nil
349+
}
350+
return nil
351+
}
352+
353+
func (fw *dockerChunkWriter) Write(p []byte) (int, error) {
354+
if !fw.isBuffering {
355+
if len(p) < dockerMaxChunkSize || p[len(p)-1] == 0x0A {
356+
// Short or complete frame: write directly without buffering.
357+
return fw.writer.Write(p)
358+
}
359+
// Long frame start: buffer the first chunk.
360+
fw.buffer.Write(p)
361+
fw.isBuffering = true
362+
return len(p), nil
363+
}
364+
365+
// Continuation chunk: strip redundant timestamp and append content.
366+
_, content, err := extractTsFromBytes(p)
367+
if err != nil {
368+
// Should not normally happen, but flog.log has entries like this for some reason.
369+
level.Warn(fw.logger).Log("msg", "could not strip timestamp from continuation chunk", "err", err)
370+
fw.buffer.Write(p)
371+
} else {
372+
fw.buffer.Write(content)
373+
}
374+
// If this is the last continuation chunk (ends with newline), flush the buffer
375+
if len(p) > 0 && p[len(p)-1] == 0x0A {
376+
if _, err := fw.writer.Write(fw.buffer.Bytes()); err != nil {
377+
return 0, err
378+
}
379+
fw.buffer.Reset()
380+
fw.isBuffering = false
381+
}
382+
return len(p), nil
383+
}

internal/component/loki/source/docker/internal/dockertarget/target_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package dockertarget
55
// read logs from Docker containers and forward them to other loki components.
66

77
import (
8+
"bytes"
89
"encoding/json"
910
"net/http"
1011
"net/http/httptest"
@@ -136,6 +137,44 @@ func TestStartStopStressTest(t *testing.T) {
136137
wg.Wait()
137138
}
138139

140+
func TestDockerChunkWriter(t *testing.T) {
141+
logger := log.NewNopLogger()
142+
var buf bytes.Buffer
143+
writer := newChunkWriter(&buf, logger)
144+
145+
timestamp := []byte("2023-12-09T12:00:00.000000000Z ")
146+
shortLine := []byte("short log line\n")
147+
148+
var longContent []byte
149+
for range 50 * 1024 {
150+
longContent = append(longContent, 'a')
151+
}
152+
longContent = append(longContent, '\n')
153+
154+
// First part of long line
155+
chunk1 := append(timestamp, longContent[:32*1024]...)
156+
_, err := writer.Write(chunk1)
157+
require.NoError(t, err)
158+
159+
// Second part of long line
160+
chunk2 := append(timestamp, longContent[32*1024:]...)
161+
_, err = writer.Write(chunk2)
162+
require.NoError(t, err)
163+
164+
// Start a new short line
165+
chunk3 := append(timestamp, shortLine...)
166+
_, err = writer.Write(chunk3)
167+
require.NoError(t, err)
168+
169+
err = writer.Close()
170+
require.NoError(t, err)
171+
172+
expected := append(timestamp, longContent...)
173+
expected = append(expected, chunk3...)
174+
175+
assert.Equal(t, expected, buf.Bytes())
176+
}
177+
139178
func newDockerServer(t *testing.T) *httptest.Server {
140179
h := func(w http.ResponseWriter, r *http.Request) {
141180
path := r.URL.Path

0 commit comments

Comments
 (0)