Skip to content

Commit 79e138d

Browse files
authored
fix(loki.source.file): Make sure position is recorded when component exit (#5412)
### Pull Request Details We only update position for a file after some time have passed and when tailer exist. But then we stopped position file before we stopped all tailers so if a file was ingested fast we never stored last read offset. To fix this we need to stop position after all trailers have been stopped and we need to set lastUpdatedPosition to the start position. ### Issue(s) fixed by this Pull Request ### Notes to the Reviewer <!-- Add any relevant notes for the reviewers and testers of this PR. --> ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [ ] Tests updated - [ ] Config converters updated
1 parent 899467a commit 79e138d

File tree

2 files changed

+13
-13
lines changed

2 files changed

+13
-13
lines changed

internal/component/loki/source/file/file.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,6 @@ func New(o component.Options, args Arguments) (*Component, error) {
252252
func (c *Component) Run(ctx context.Context) error {
253253
defer func() {
254254
level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping sources and positions file")
255-
// We need to stop posFile first so we don't record entries we are draining
256-
c.posFile.Stop()
257255

258256
// Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop().
259257
source.Drain(c.handler, func() {
@@ -264,6 +262,8 @@ func (c *Component) Run(ctx context.Context) error {
264262
close(c.handler.Chan())
265263
c.mut.Unlock()
266264
})
265+
266+
c.posFile.Stop()
267267
}()
268268

269269
var wg sync.WaitGroup

internal/component/loki/source/file/tailer.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (t *tailer) Run(ctx context.Context) {
145145
default:
146146
}
147147

148-
err := t.initRun()
148+
pos, err := t.initRun()
149149
if err != nil {
150150
// We are retrying tailers until the target has disappeared.
151151
// We are mostly interested in this log if this happens directly when
@@ -165,7 +165,7 @@ func (t *tailer) Run(ctx context.Context) {
165165
ctx, cancel := context.WithCancel(ctx)
166166
go func() {
167167
// readLines closes done on exit
168-
t.readLines(done)
168+
t.readLines(pos, done)
169169
cancel()
170170
}()
171171

@@ -176,21 +176,21 @@ func (t *tailer) Run(ctx context.Context) {
176176
t.stop(done)
177177
}
178178

179-
func (t *tailer) initRun() error {
179+
func (t *tailer) initRun() (int64, error) {
180180
fi, err := os.Stat(t.key.Path)
181181
if err != nil {
182-
return fmt.Errorf("failed to tail file: %w", err)
182+
return 0, fmt.Errorf("failed to tail file: %w", err)
183183
}
184184

185185
pos, err := t.positions.Get(t.key.Path, t.key.Labels)
186186
if err != nil {
187187
switch t.onPositionsFileError {
188188
case OnPositionsFileErrorSkip:
189-
return fmt.Errorf("failed to get file position: %w", err)
189+
return 0, fmt.Errorf("failed to get file position: %w", err)
190190
case OnPositionsFileErrorRestartEnd:
191191
pos, err = getLastLinePosition(t.key.Path)
192192
if err != nil {
193-
return fmt.Errorf("failed to get last line position after positions error: %w", err)
193+
return 0, fmt.Errorf("failed to get last line position after positions error: %w", err)
194194
}
195195
level.Info(t.logger).Log("msg", "retrieved the position of the last line after positions error")
196196
default:
@@ -207,7 +207,7 @@ func (t *tailer) initRun() error {
207207
if pos == 0 && t.legacyPositionUsed {
208208
pos, err = t.positions.Get(t.key.Path, "{}")
209209
if err != nil {
210-
return fmt.Errorf("failed to get file position with empty labels: %w", err)
210+
return 0, fmt.Errorf("failed to get file position with empty labels: %w", err)
211211
}
212212
}
213213

@@ -240,19 +240,19 @@ func (t *tailer) initRun() error {
240240
})
241241

242242
if err != nil {
243-
return fmt.Errorf("failed to tail the file: %w", err)
243+
return pos, fmt.Errorf("failed to tail the file: %w", err)
244244
}
245245

246246
t.file = tail
247247

248-
return nil
248+
return pos, nil
249249
}
250250

251251
// readLines reads lines from the tailed file by calling Next() in a loop.
252252
// It processes each line by sending it to the receiver's channel and updates
253253
// position tracking periodically. It exits when Next() returns an error,
254254
// this happens when the tail.File is stopped or or we have a unrecoverable error.
255-
func (t *tailer) readLines(done chan struct{}) {
255+
func (t *tailer) readLines(pos int64, done chan struct{}) {
256256
level.Info(t.logger).Log("msg", "start tailing file")
257257

258258
if t.decompression.Enabled && t.decompression.InitialDelay > 0 {
@@ -261,8 +261,8 @@ func (t *tailer) readLines(done chan struct{}) {
261261
}
262262

263263
var (
264+
lastOffset = pos
264265
entries = t.receiver.Chan()
265-
lastOffset = int64(0)
266266
positionInterval = t.positions.SyncPeriod()
267267
lastUpdatedPosition = time.Time{}
268268
)

0 commit comments

Comments
 (0)