Skip to content

Commit 688da66

Browse files
authored
loki.source.file: refactor tail package (#5003)
Refactor internal tail package to have simpler api with less resource usage
1 parent 01c4a62 commit 688da66

File tree

22 files changed

+979
-1266
lines changed

22 files changed

+979
-1266
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ Main (unreleased)
5151

5252
- Updated Loki dependencies to v3.6.2. (@thampiotr)
5353

54+
- Refactor tailer used in `loki.source.file` to reduce resource usage. (@kalleep)
55+
5456
### Bugfixes
5557

5658
- (_Public Preview_) Additions to `database_observability.postgres` component:

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ require (
308308
google.golang.org/api v0.254.0
309309
google.golang.org/grpc v1.76.0
310310
google.golang.org/protobuf v1.36.10
311-
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
312311
gopkg.in/yaml.v2 v2.4.0
313312
gopkg.in/yaml.v3 v3.0.1
314313
gotest.tools v2.2.0+incompatible

internal/component/loki/source/file/internal/tail/README.md

Lines changed: 0 additions & 32 deletions
This file was deleted.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package tail
2+
3+
import (
4+
"context"
5+
"os"
6+
"runtime"
7+
8+
"github.com/grafana/dskit/backoff"
9+
10+
"github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext"
11+
)
12+
13+
// blockUntilExists blocks until the file specified in cfg exists or the context is canceled.
14+
// It polls the file system at intervals defined by WatcherConfig polling frequencies.
15+
// Returns an error if the context is canceled or an unrecoverable error occurs.
16+
func blockUntilExists(ctx context.Context, cfg *Config) error {
17+
backoff := backoff.New(ctx, backoff.Config{
18+
MinBackoff: cfg.WatcherConfig.MinPollFrequency,
19+
MaxBackoff: cfg.WatcherConfig.MaxPollFrequency,
20+
})
21+
22+
for backoff.Ongoing() {
23+
if _, err := os.Stat(cfg.Filename); err == nil {
24+
return nil
25+
} else if !os.IsNotExist(err) {
26+
return err
27+
}
28+
backoff.Wait()
29+
}
30+
31+
return backoff.Err()
32+
}
33+
34+
// event represents a file system event detected during polling.
35+
type event int
36+
37+
const (
38+
eventNone event = iota // no event detected
39+
eventTruncated // file was truncated (size decreased)
40+
eventModified // file was modified (size increased or modification time changed)
41+
eventDeleted // file was deleted, moved, or renamed
42+
)
43+
44+
// blockUntilEvent blocks until it detects a file system event for the given file or the context is canceled.
45+
// It polls the file system to detect modifications, truncations, deletions, or renames.
46+
// The pos parameter is the current file position and is used to detect truncation events.
47+
// Returns the detected event type and any error encountered. Returns eventNone if the context is canceled.
48+
func blockUntilEvent(ctx context.Context, f *os.File, prevSize int64, cfg *Config) (event, error) {
49+
// NOTE: it is important that we stat the open file here. Later we do os.Stat(cfg.Filename)
50+
// and use os.IsSameFile to detect if file was rotated.
51+
origFi, err := f.Stat()
52+
if err != nil {
53+
// If file no longer exists we treat it as a delete event.
54+
if os.IsNotExist(err) {
55+
return eventDeleted, nil
56+
}
57+
return eventNone, err
58+
}
59+
60+
backoff := backoff.New(ctx, backoff.Config{
61+
MinBackoff: cfg.WatcherConfig.MinPollFrequency,
62+
MaxBackoff: cfg.WatcherConfig.MaxPollFrequency,
63+
})
64+
65+
prevModTime := origFi.ModTime()
66+
67+
for backoff.Ongoing() {
68+
deletePending, err := fileext.IsDeletePending(f)
69+
70+
// DeletePending is a windows state where the file has been queued
71+
// for delete but won't actually get deleted until all handles are
72+
// closed. It's a variation on the NotifyDeleted call below.
73+
//
74+
// IsDeletePending may fail in cases where the file handle becomes
75+
// invalid, so we treat a failed call the same as a pending delete.
76+
if err != nil || deletePending {
77+
return eventDeleted, nil
78+
}
79+
80+
fi, err := os.Stat(cfg.Filename)
81+
if err != nil {
82+
// Windows cannot delete a file if a handle is still open (tail keeps one open)
83+
// so it gives access denied to anything trying to read it until all handles are released.
84+
if os.IsNotExist(err) || (runtime.GOOS == "windows" && os.IsPermission(err)) {
85+
// File does not exist (has been deleted).
86+
return eventDeleted, nil
87+
}
88+
return eventNone, err
89+
}
90+
91+
// File got moved/renamed?
92+
if !os.SameFile(origFi, fi) {
93+
return eventDeleted, nil
94+
}
95+
96+
// File got truncated?
97+
currentSize := fi.Size()
98+
if prevSize > 0 && prevSize > currentSize {
99+
return eventTruncated, nil
100+
}
101+
102+
// File got bigger?
103+
if prevSize < currentSize {
104+
return eventModified, nil
105+
}
106+
107+
// File was appended to (changed)?
108+
if fi.ModTime() != prevModTime {
109+
return eventModified, nil
110+
}
111+
112+
// File hasn't changed so wait until next retry.
113+
backoff.Wait()
114+
}
115+
116+
return eventNone, backoff.Err()
117+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package tail
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
"path/filepath"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext"
14+
)
15+
16+
func TestBlockUntilExists(t *testing.T) {
17+
watcherConfig := WatcherConfig{
18+
MinPollFrequency: 5 * time.Millisecond,
19+
MaxPollFrequency: 5 * time.Millisecond,
20+
}
21+
22+
t.Run("should block until file exists", func(t *testing.T) {
23+
filename := filepath.Join(t.TempDir(), "eventually")
24+
25+
go func() {
26+
time.Sleep(10 * time.Millisecond)
27+
createFileWithPath(t, filename, "")
28+
}()
29+
30+
err := blockUntilExists(context.Background(), &Config{
31+
Filename: filename,
32+
WatcherConfig: watcherConfig,
33+
})
34+
require.NoError(t, err)
35+
})
36+
37+
t.Run("should exit when context is canceled", func(t *testing.T) {
38+
ctx, cancel := context.WithCancel(context.Background())
39+
go func() {
40+
time.Sleep(10 * time.Millisecond)
41+
cancel()
42+
}()
43+
44+
err := blockUntilExists(ctx, &Config{
45+
Filename: filepath.Join(t.TempDir(), "never"),
46+
WatcherConfig: watcherConfig,
47+
})
48+
require.ErrorIs(t, err, context.Canceled)
49+
})
50+
}
51+
52+
func TestBlockUntilEvent(t *testing.T) {
53+
watcherConfig := WatcherConfig{
54+
MinPollFrequency: 5 * time.Millisecond,
55+
MaxPollFrequency: 5 * time.Millisecond,
56+
}
57+
58+
t.Run("should return modified event when file is written to", func(t *testing.T) {
59+
f := createEmptyFile(t, "startempty")
60+
defer f.Close()
61+
62+
go func() {
63+
time.Sleep(10 * time.Millisecond)
64+
_, err := f.WriteString("updated")
65+
require.NoError(t, err)
66+
}()
67+
68+
event, err := blockUntilEvent(context.Background(), f, 0, &Config{
69+
Filename: f.Name(),
70+
WatcherConfig: watcherConfig,
71+
})
72+
require.NoError(t, err)
73+
require.Equal(t, eventModified, event)
74+
})
75+
76+
t.Run("should return modified event if mod time is updated", func(t *testing.T) {
77+
f := createEmptyFile(t, "startempty")
78+
defer f.Close()
79+
80+
go func() {
81+
time.Sleep(10 * time.Millisecond)
82+
require.NoError(t, os.Chtimes(f.Name(), time.Now(), time.Now()))
83+
}()
84+
85+
event, err := blockUntilEvent(context.Background(), f, 0, &Config{
86+
Filename: f.Name(),
87+
WatcherConfig: watcherConfig,
88+
})
89+
require.NoError(t, err)
90+
require.Equal(t, eventModified, event)
91+
})
92+
93+
t.Run("should return deleted event if file is deleted", func(t *testing.T) {
94+
f := createEmptyFile(t, "startempty")
95+
require.NoError(t, f.Close())
96+
97+
// NOTE: important for windows that we open with correct flags.
98+
f, err := fileext.OpenFile(f.Name())
99+
require.NoError(t, err)
100+
defer f.Close()
101+
102+
go func() {
103+
time.Sleep(10 * time.Millisecond)
104+
removeFile(t, f.Name())
105+
}()
106+
107+
event, err := blockUntilEvent(context.Background(), f, 0, &Config{
108+
Filename: f.Name(),
109+
WatcherConfig: watcherConfig,
110+
})
111+
require.NoError(t, err)
112+
require.Equal(t, eventDeleted, event)
113+
})
114+
115+
t.Run("should return deleted event if file is deleted before", func(t *testing.T) {
116+
f := createEmptyFile(t, "startempty")
117+
require.NoError(t, f.Close())
118+
119+
// NOTE: important for windows that we open with correct flags.
120+
f, err := fileext.OpenFile(f.Name())
121+
require.NoError(t, err)
122+
defer f.Close()
123+
124+
removeFile(t, f.Name())
125+
126+
event, err := blockUntilEvent(context.Background(), f, 0, &Config{
127+
Filename: f.Name(),
128+
WatcherConfig: watcherConfig,
129+
})
130+
require.NoError(t, err)
131+
require.Equal(t, eventDeleted, event)
132+
})
133+
134+
t.Run("should return truncated event", func(t *testing.T) {
135+
f := createFileWithContent(t, "truncate", "content")
136+
defer f.Close()
137+
138+
offset, err := f.Seek(0, io.SeekEnd)
139+
require.NoError(t, err)
140+
141+
go func() {
142+
time.Sleep(10 * time.Millisecond)
143+
require.NoError(t, f.Truncate(0))
144+
}()
145+
146+
event, err := blockUntilEvent(context.Background(), f, offset, &Config{
147+
Filename: f.Name(),
148+
WatcherConfig: watcherConfig,
149+
})
150+
require.NoError(t, err)
151+
require.Equal(t, eventTruncated, event)
152+
})
153+
154+
t.Run("should exit when context is canceled", func(t *testing.T) {
155+
f := createEmptyFile(t, "startempty")
156+
defer f.Close()
157+
158+
ctx, cancel := context.WithCancel(context.Background())
159+
go func() {
160+
time.Sleep(10 * time.Millisecond)
161+
cancel()
162+
}()
163+
164+
event, err := blockUntilEvent(ctx, f, 0, &Config{
165+
Filename: f.Name(),
166+
WatcherConfig: watcherConfig,
167+
})
168+
require.ErrorIs(t, err, context.Canceled)
169+
require.Equal(t, eventNone, event)
170+
})
171+
}
172+
173+
func createEmptyFile(t *testing.T, name string) *os.File {
174+
path := filepath.Join(t.TempDir(), name)
175+
f, err := os.Create(path)
176+
require.NoError(t, err)
177+
return f
178+
}
179+
180+
func createFileWithContent(t *testing.T, name, content string) *os.File {
181+
path := createFile(t, name, content)
182+
f, err := os.OpenFile(path, os.O_RDWR, 0)
183+
require.NoError(t, err)
184+
return f
185+
}
186+
187+
func createFileWithPath(t *testing.T, path, content string) {
188+
require.NoError(t, os.WriteFile(path, []byte(content), 0600))
189+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package tail
2+
3+
import (
4+
"time"
5+
6+
"golang.org/x/text/encoding"
7+
)
8+
9+
// Config holds configuration for tailing a file.
10+
type Config struct {
11+
// Filename is the path to the file to tail.
12+
Filename string
13+
// Offset is the byte offset in the file where tailing should start.
14+
// If 0, tailing starts from the beginning of the file.
15+
Offset int64
16+
17+
// Decoder is an optional text decoder for non-UTF-8 encoded files.
18+
// If the file is not UTF-8, the tailer must use the correct decoder
19+
// or the output text may be corrupted. For example, if the file is
20+
// "UTF-16 LE" encoded, the tailer would not separate new lines properly
21+
// and the output could appear as garbled characters.
22+
Decoder *encoding.Decoder
23+
24+
// WatcherConfig controls how the file system is polled for changes.
25+
WatcherConfig WatcherConfig
26+
}
27+
28+
// WatcherConfig controls the polling behavior for detecting file system events.
29+
type WatcherConfig struct {
30+
// MinPollFrequency and MaxPollFrequency specify the polling frequency range
31+
// for detecting file system events. The actual polling frequency will vary
32+
// within this range based on backoff behavior.
33+
MinPollFrequency, MaxPollFrequency time.Duration
34+
}
35+
36+
// defaultWatcherConfig holds the default polling configuration used when
37+
// WatcherConfig is not explicitly provided in Config.
38+
var defaultWatcherConfig = WatcherConfig{
39+
MinPollFrequency: 250 * time.Millisecond,
40+
MaxPollFrequency: 250 * time.Millisecond,
41+
}

0 commit comments

Comments
 (0)