Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
fdf8d19
fix: add signature for tail.File so we can handle atomic writes
kalleep Dec 18, 2025
5687a9b
fix: if generated signature is empty we always asume they are not equal
kalleep Dec 18, 2025
3e7c421
smarter signature updates
kalleep Dec 18, 2025
cbb17d8
use 1024 as ideal identity size
kalleep Dec 19, 2025
85ac6ab
we don't have to check offset because we already caches the lastOffset
kalleep Dec 19, 2025
6585c27
Handle edgecase where a file gets parialy truncated
kalleep Dec 19, 2025
b62379d
fix: close file in case of errors.
kalleep Jan 12, 2026
57d8d12
recompute once max size is reached
kalleep Jan 12, 2026
e2e8106
address comments
kalleep Jan 12, 2026
3b2aa76
add tests
kalleep Jan 12, 2026
6fe1a0e
Close file if error occurs in NewFile
kalleep Jan 12, 2026
7bc73f7
Add more test
kalleep Jan 12, 2026
687114f
fix test
kalleep Jan 12, 2026
44a9281
Merge branch 'main' into kalleep/tailer-signature
kalleep Jan 15, 2026
8ad1e28
Change to use ReadAt, this will not affact file position
kalleep Jan 15, 2026
50ec656
close file if we cannot create reader and ignore ErrClosed
kalleep Jan 15, 2026
29e707e
revert ignore
kalleep Jan 15, 2026
fad38c0
fix comment
kalleep Jan 19, 2026
aedbd61
Merge branch 'main' into kalleep/tailer-signature
kalleep Jan 21, 2026
40f72dd
fix: use atomic package to perform writes in test
kalleep Jan 21, 2026
ba8bbcb
Use ReplaceFileW instead of atomic package for windows atomic writes.
kalleep Jan 22, 2026
c8723d9
Update comments
kalleep Jan 22, 2026
da29c49
fix comment
kalleep Jan 22, 2026
f997287
move to test only func
kalleep Jan 22, 2026
77209ea
rename file
kalleep Jan 22, 2026
d395d2e
fix type
kalleep Jan 22, 2026
f680e17
fix assert
kalleep Jan 22, 2026
ce9b8cd
remove remove call, the file is move during FileRenameW
kalleep Jan 22, 2026
7c4e710
Update internal/component/loki/source/file/internal/tail/signature.go
kalleep Jan 22, 2026
18a8f08
fix function name
kalleep Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 60 additions & 16 deletions internal/component/loki/source/file/internal/tail/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,36 @@ func NewFile(logger log.Logger, cfg *Config) (*File, error) {
cfg.WatcherConfig = defaultWatcherConfig
}

if cfg.Offset != 0 {
// Seek to provided offset
sig, err := newSignatureFromFile(f)
if err != nil {
f.Close()
return nil, err
}

if cfg.Offset > 0 {
if _, err := f.Seek(cfg.Offset, io.SeekStart); err != nil {
f.Close()
return nil, err
}
}

scanner, err := newReader(f, cfg.Offset, cfg.Encoding)
reader, err := newReader(f, cfg.Offset, cfg.Encoding)
if err != nil {
f.Close()
return nil, err
}

cfg.WatcherConfig.MinPollFrequency = min(cfg.WatcherConfig.MinPollFrequency, cfg.WatcherConfig.MaxPollFrequency)
ctx, cancel := context.WithCancel(context.Background())

return &File{
cfg: cfg,
logger: logger,
file: f,
reader: scanner,
ctx: ctx,
cancel: cancel,
cfg: cfg,
logger: logger,
file: f,
reader: reader,
ctx: ctx,
cancel: cancel,
signature: sig,
}, nil
}

Expand All @@ -67,10 +75,10 @@ type File struct {
cfg *Config
logger log.Logger

// protects file, reader, and lastOffset.
mu sync.Mutex
file *os.File
reader *reader
mu sync.Mutex
file *os.File
reader *reader
signature *signature

// bufferedLines stores lines that were read from an old file handle before
// it was closed during file rotation.
Expand Down Expand Up @@ -114,9 +122,22 @@ read:
return nil, err
}

offset := f.reader.position()

// Recompute signature if we've crossed a threshold and haven't reached it yet.
// This progressively builds a more complete signature as the file grows.
if f.signature.shouldRecompute(offset) {
sig, err := newSignatureFromFile(f.file)
if err != nil {
return nil, err
}

f.signature = sig
}

return &Line{
Text: text,
Offset: f.reader.position(),
Offset: offset,
Time: time.Now(),
}, nil
}
Expand Down Expand Up @@ -205,7 +226,6 @@ func (f *File) drain() {
}
return
}

f.bufferedLines = append(f.bufferedLines, Line{
Text: text,
Offset: f.reader.position(),
Expand Down Expand Up @@ -268,8 +288,32 @@ func (f *File) reopen(truncated bool) error {
continue
}

// Compute a new signature and compare it with the previous one to detect atomic writes.
// When a file is replaced atomically, the file handle changes but the
// initial content may be the same. If signatures match, it's the same file content,
// so we continue from the previous offset. If they differ, it's a different
// file, so we start from the beginning.
sig, err := newSignatureFromFile(file)
if err != nil {
file.Close()
return err
}

var offset int64
if !f.signature.equal(sig) {
offset = 0
} else {
offset = min(f.reader.position(), nf.Size())
}

f.file = file
f.reader.reset(f.file)
f.signature = sig
if _, err := f.file.Seek(offset, io.SeekStart); err != nil {
file.Close()
return err
}
f.reader.reset(f.file, offset)

break
}

Expand Down
53 changes: 53 additions & 0 deletions internal/component/loki/source/file/internal/tail/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"golang.org/x/text/encoding/unicode"

"github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext"
)

func TestFile(t *testing.T) {
Expand Down Expand Up @@ -371,6 +373,53 @@ func TestFile(t *testing.T) {

verify(t, file, &Line{Text: "Hello, 世界", Offset: 18}, nil)
})

t.Run("should handle atomic writes", func(t *testing.T) {
name := createFile(t, "atomicwrite", "line1\nline2\nline3\nline4\n")
defer removeFile(t, name)

file, err := NewFile(log.NewNopLogger(), &Config{
Filename: name,
WatcherConfig: WatcherConfig{
MinPollFrequency: 50 * time.Millisecond,
MaxPollFrequency: 50 * time.Millisecond,
},
})
require.NoError(t, err)
defer file.Stop()

// Read first two lines
verify(t, file, &Line{Text: "line1", Offset: 6}, nil)
verify(t, file, &Line{Text: "line2", Offset: 12}, nil)
atomicwrite(t, name, "line1\nline2\nline3\nline4\nnewline1\n")
verify(t, file, &Line{Text: "line3", Offset: 18}, nil)
verify(t, file, &Line{Text: "line4", Offset: 24}, nil)
verify(t, file, &Line{Text: "newline1", Offset: 33}, nil)
})

t.Run("should handle atomic writes with new content", func(t *testing.T) {
name := createFile(t, "atomicwrite", "line1\nline2\nline3\nline4\n")
defer removeFile(t, name)

file, err := NewFile(log.NewNopLogger(), &Config{
Filename: name,
WatcherConfig: WatcherConfig{
MinPollFrequency: 50 * time.Millisecond,
MaxPollFrequency: 50 * time.Millisecond,
},
})
require.NoError(t, err)
defer file.Stop()

// Read first two lines
verify(t, file, &Line{Text: "line1", Offset: 6}, nil)
verify(t, file, &Line{Text: "line2", Offset: 12}, nil)
atomicwrite(t, name, "newline1\n")
// Because we buffer lines when file is deleted we still get line3 and line4.
verify(t, file, &Line{Text: "line3", Offset: 18}, nil)
verify(t, file, &Line{Text: "line4", Offset: 24}, nil)
verify(t, file, &Line{Text: "newline1", Offset: 9}, nil)
})
}

func createFile(t *testing.T, name, content string) string {
Expand Down Expand Up @@ -404,3 +453,7 @@ func rotateFile(t *testing.T, name, newContent string) {
// Create new file with same name
require.NoError(t, os.WriteFile(name, []byte(newContent), 0600))
}

func atomicwrite(t *testing.T, name, newContent string) {
require.NoError(t, fileext.AtomicWrite(name, []byte(newContent)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package fileext

import (
"bytes"
"os"
"path/filepath"

"github.com/natefinch/atomic"
)

func OpenFile(name string) (file *os.File, err error) {
Expand All @@ -26,3 +29,8 @@ func OpenFile(name string) (file *os.File, err error) {
func IsDeletePending(_ *os.File) (bool, error) {
return false, nil
}

// AtomicWrite performs an atomic write on POSIX systems using the atomic package.
func AtomicWrite(name string, content []byte) error {
return atomic.WriteFile(name, bytes.NewReader(content))
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package fileext

import (
"os"
"path/filepath"
"runtime"
"syscall"
"unsafe"
Expand Down Expand Up @@ -113,3 +114,75 @@ func getFileStandardInfo(f *os.File) (*fileStandardInfo, error) {
runtime.KeepAlive(f)
return si, nil
}

// AtomicWrite performs an atomic write on Windows using ReplaceFileW.
// This allows replacing a file when it's open with FILE_SHARE_DELETE.
// This function is only used in tests.
func AtomicWrite(name string, content []byte) error {
var (
kernel32 = windows.NewLazySystemDLL("kernel32.dll")
replaceFileW = kernel32.NewProc("ReplaceFileW")
)

dir, filename := filepath.Dir(name), filepath.Base(name)

// Create temp file in the same directory
tmp, err := os.CreateTemp(dir, filename+".tmp")
if err != nil {
return err
}
tmpName := tmp.Name()
defer os.Remove(tmpName)

// Write content to temp file
_, err = tmp.Write(content)
if err != nil {
tmp.Close()
return err
}

// Sync to ensure data is written
if err := tmp.Sync(); err != nil {
tmp.Close()
return err
}

// Close the temp file before replacing
if err := tmp.Close(); err != nil {
return err
}

// Convert paths to UTF-16 for ReplaceFileW
replacedPath, err := syscall.UTF16PtrFromString(name)
if err != nil {
return err
}

replacementPath, err := syscall.UTF16PtrFromString(tmpName)
if err != nil {
return err
}

// Use ReplaceFileW to atomically replace the target file with the temp file
// This works when target file is open with FILE_SHARE_DELETE.
// https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-replacefilew
ret, _, errno := replaceFileW.Call(
uintptr(unsafe.Pointer(replacedPath)),
uintptr(unsafe.Pointer(replacementPath)),
0, // NULL backup file
0, // No flags
0, // NULL exclude
0, // NULL reserved
)

// errno is always non-nil so we should only sett err if ret is 0.
if ret == 0 {
err = errno
}

if err != nil {
return &os.PathError{Op: "AtomicWrite", Path: name, Err: err}
}

return nil
}
4 changes: 2 additions & 2 deletions internal/component/loki/source/file/internal/tail/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ func (r *reader) position() int64 {

// reset prepares the reader for a new file handle, assuming the same encoding.
// It skips the BOM, resets the buffered reader and decoder, and clears pending data.
func (r *reader) reset(f *os.File) {
func (r *reader) reset(f *os.File, offset int64) {
// Skip BOM if needed, we asume that the rotated file have the same encoding.
offset, _ := skipBOM(f, 0)
offset, _ = skipBOM(f, offset)
r.pos = offset
r.br.Reset(f)
r.decoder.Reset()
Expand Down
78 changes: 78 additions & 0 deletions internal/component/loki/source/file/internal/tail/signature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package tail

import (
"bytes"
"errors"
"fmt"
"io"
"os"
)

// signature represents the first N bytes of a file, used to detect atomic writes
// where a file is replaced but may have the same initial content.
type signature struct {
d []byte
}

// signatureSize is the target size for a complete signature.
const signatureSize = 1024

// signatureThresholds defines the byte offsets at which we should recompute the signature
// as the file grows. This allows us to progressively build a more complete signature.
var signatureThresholds = []int{64, 128, 256, 512, signatureSize}

// newSignatureFromFile reads up to signatureSize bytes from the beginning of the file
// to create a signature. If the file is smaller, the signature will be incomplete.
func newSignatureFromFile(f *os.File) (*signature, error) {
buf := make([]byte, signatureSize)
n, err := f.ReadAt(buf, 0)
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("failed to compute signature for file: %w", err)
}
return &signature{
d: buf[:n],
}, nil
}

// completed returns true if the signature has reached the target size.
func (s *signature) completed() bool {
return len(s.d) == signatureSize
}

// equal compares two signatures. For incomplete signatures, it compares only
// the overlapping bytes. For complete signatures, both must be the same length and content.
// If signature has a lenght of 0 equal will always return false.
func (s *signature) equal(other *signature) bool {
len1 := len(s.d)
if len1 == 0 {
return false
}

len2 := len(other.d)
if !s.completed() {
if len1 > len2 {
return false
}
return bytes.Equal(s.d[:len1], other.d[:len1])
}

return len1 == len2 && bytes.Equal(s.d, other.d)
}

// shouldRecompute returns true if we have read past a signature threshold and
// the current signature is smaller than that threshold. This allows us to
// progressively update the signature as the file grows.
func (s *signature) shouldRecompute(at int64) bool {
if s.completed() {
return false
}

currentSize := len(s.d)
for _, threshold := range signatureThresholds {
if at >= int64(threshold) && currentSize < threshold {
return true
}
}

return false
}
Loading
Loading