Skip to content

Commit f8b1de8

Browse files
authored
fix: Allow loki.source.file to read renaming lines of a deleted file before it tries to re open a new one (#5270)
1 parent 1c97c2d commit f8b1de8

File tree

2 files changed

+49
-19
lines changed

2 files changed

+49
-19
lines changed

internal/component/loki/source/file/internal/tail/file.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,14 @@ func (f *File) wait() error {
163163
return f.reopen(true)
164164
case eventDeleted:
165165
level.Debug(f.logger).Log("msg", "file deleted")
166-
// if a file is deleted we want to make sure we drain what's remaining in the open file.
166+
// If a file is deleted we want to make sure we drain what's remaining in the open file.
167167
f.drain()
168+
// If we have any buffered lines after drain we can return here to make sure they are consumed and
169+
// we are not blocking on reopening the new file.
170+
if len(f.bufferedLines) > 0 {
171+
level.Debug(f.logger).Log("msg", "finish reading deleted file before reopen")
172+
return nil
173+
}
168174
// In polling mode we could miss events when a file is deleted, so before we give up
169175
// we try to reopen the file.
170176
return f.reopen(false)

internal/component/loki/source/file/internal/tail/file_test.go

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -199,24 +199,6 @@ func TestFile(t *testing.T) {
199199
require.ErrorIs(t, err, context.Canceled)
200200
})
201201

202-
t.Run("UTF-16LE", func(t *testing.T) {
203-
file, err := NewFile(log.NewNopLogger(), &Config{
204-
Filename: "testdata/mssql.log",
205-
Encoding: unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
206-
})
207-
require.NoError(t, err)
208-
defer file.Stop()
209-
210-
verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 180}, nil)
211-
verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 228}, nil)
212-
verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 314}, nil)
213-
verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 <X64> (Build 20348: ) (Hypervisor)", Offset: 518}, nil)
214-
verify(t, file, &Line{Text: "", Offset: 522}, nil)
215-
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 636}, nil)
216-
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 762}, nil)
217-
verify(t, file, &Line{Text: "2025-03-11 11:11:02.72 Server All rights reserved.", Offset: 876}, nil)
218-
})
219-
220202
t.Run("calls to next after stop", func(t *testing.T) {
221203
name := createFile(t, "stopped", "hello\n")
222204
defer removeFile(t, name)
@@ -261,6 +243,48 @@ func TestFile(t *testing.T) {
261243
verify(t, file, &Line{Text: "newline2", Offset: 18}, nil)
262244
})
263245

246+
t.Run("deleted while reading", func(t *testing.T) {
247+
name := createFile(t, "removed", "1\n2\n")
248+
file, err := NewFile(log.NewNopLogger(), &Config{
249+
Offset: 0,
250+
Filename: name,
251+
WatcherConfig: WatcherConfig{
252+
MinPollFrequency: 50 * time.Millisecond,
253+
MaxPollFrequency: 50 * time.Millisecond,
254+
},
255+
})
256+
require.NoError(t, err)
257+
258+
go func() {
259+
time.Sleep(50 * time.Millisecond)
260+
appendToFile(t, name, "3\n4\n")
261+
removeFile(t, name)
262+
}()
263+
264+
verify(t, file, &Line{Text: "1", Offset: 2}, nil)
265+
verify(t, file, &Line{Text: "2", Offset: 4}, nil)
266+
verify(t, file, &Line{Text: "3", Offset: 6}, nil)
267+
verify(t, file, &Line{Text: "4", Offset: 8}, nil)
268+
})
269+
270+
t.Run("UTF-16LE", func(t *testing.T) {
271+
file, err := NewFile(log.NewNopLogger(), &Config{
272+
Filename: "testdata/mssql.log",
273+
Encoding: unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
274+
})
275+
require.NoError(t, err)
276+
defer file.Stop()
277+
278+
verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 180}, nil)
279+
verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 228}, nil)
280+
verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 314}, nil)
281+
verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 <X64> (Build 20348: ) (Hypervisor)", Offset: 518}, nil)
282+
verify(t, file, &Line{Text: "", Offset: 522}, nil)
283+
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 636}, nil)
284+
verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 762}, nil)
285+
verify(t, file, &Line{Text: "2025-03-11 11:11:02.72 Server All rights reserved.", Offset: 876}, nil)
286+
})
287+
264288
t.Run("should detect UTF-16LE encoding from BOM", func(t *testing.T) {
265289
enc := unicode.UTF16(unicode.LittleEndian, unicode.UseBOM).NewEncoder()
266290
encoded, err := enc.String("Hello, 世界\r\n")

0 commit comments

Comments
 (0)