Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion internal/component/loki/source/file/internal/tail/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,14 @@ func (f *File) wait() error {
return f.reopen(true)
case eventDeleted:
level.Debug(f.logger).Log("msg", "file deleted")
// if a file is deleted we want to make sure we drain what's remaining in the open file.
// If a file is deleted we want to make sure we drain what's remaining in the open file.
f.drain()
// If we have any buffered lines after drain we can return here to make sure they are consumed and
// we are not blocking on reopening the new file.
if len(f.bufferedLines) > 0 {
level.Debug(f.logger).Log("msg", "finish reading deleted file before reopen")
return nil
}
// In polling mode we could miss events when a file is deleted, so before we give up
// we try to reopen the file.
return f.reopen(false)
Expand Down
60 changes: 42 additions & 18 deletions internal/component/loki/source/file/internal/tail/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,24 +199,6 @@ func TestFile(t *testing.T) {
require.ErrorIs(t, err, context.Canceled)
})

t.Run("UTF-16LE", func(t *testing.T) {
file, err := NewFile(log.NewNopLogger(), &Config{
Filename: "testdata/mssql.log",
Encoding: unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
})
require.NoError(t, err)
defer file.Stop()

verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 180}, nil)
verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 228}, nil)
verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 314}, nil)
verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 <X64> (Build 20348: ) (Hypervisor)", Offset: 518}, nil)
verify(t, file, &Line{Text: "", Offset: 522}, nil)
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 636}, nil)
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 762}, nil)
verify(t, file, &Line{Text: "2025-03-11 11:11:02.72 Server All rights reserved.", Offset: 876}, nil)
})

t.Run("calls to next after stop", func(t *testing.T) {
name := createFile(t, "stopped", "hello\n")
defer removeFile(t, name)
Expand Down Expand Up @@ -261,6 +243,48 @@ func TestFile(t *testing.T) {
verify(t, file, &Line{Text: "newline2", Offset: 18}, nil)
})

t.Run("deleted while reading", func(t *testing.T) {
name := createFile(t, "removed", "1\n2\n")
file, err := NewFile(log.NewNopLogger(), &Config{
Offset: 0,
Filename: name,
WatcherConfig: WatcherConfig{
MinPollFrequency: 50 * time.Millisecond,
MaxPollFrequency: 50 * time.Millisecond,
},
})
require.NoError(t, err)

go func() {
time.Sleep(50 * time.Millisecond)
appendToFile(t, name, "3\n4\n")
removeFile(t, name)
}()

verify(t, file, &Line{Text: "1", Offset: 2}, nil)
verify(t, file, &Line{Text: "2", Offset: 4}, nil)
verify(t, file, &Line{Text: "3", Offset: 6}, nil)
verify(t, file, &Line{Text: "4", Offset: 8}, nil)
})

t.Run("UTF-16LE", func(t *testing.T) {
file, err := NewFile(log.NewNopLogger(), &Config{
Filename: "testdata/mssql.log",
Encoding: unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
})
require.NoError(t, err)
defer file.Stop()

verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 180}, nil)
verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 228}, nil)
verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 314}, nil)
verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 <X64> (Build 20348: ) (Hypervisor)", Offset: 518}, nil)
verify(t, file, &Line{Text: "", Offset: 522}, nil)
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 636}, nil)
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 762}, nil)
verify(t, file, &Line{Text: "2025-03-11 11:11:02.72 Server All rights reserved.", Offset: 876}, nil)
})

t.Run("should detect UTF-16LE encoding from BOM", func(t *testing.T) {
enc := unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder()
encoded, err := enc.String("Hello, 世界\r\n")
Expand Down
Loading