Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,6 @@ func New(o component.Options, args Arguments) (*Component, error) {
func (c *Component) Run(ctx context.Context) error {
defer func() {
level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping sources and positions file")
// We need to stop posFile first so we don't record entries we are draining
c.posFile.Stop()

// Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop().
source.Drain(c.handler, func() {
Expand All @@ -264,6 +262,8 @@ func (c *Component) Run(ctx context.Context) error {
close(c.handler.Chan())
c.mut.Unlock()
})

c.posFile.Stop()
}()

var wg sync.WaitGroup
Expand Down
22 changes: 11 additions & 11 deletions internal/component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (t *tailer) Run(ctx context.Context) {
default:
}

err := t.initRun()
pos, err := t.initRun()
if err != nil {
// We are retrying tailers until the target has disappeared.
// We are mostly interested in this log if this happens directly when
Expand All @@ -165,7 +165,7 @@ func (t *tailer) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
go func() {
// readLines closes done on exit
t.readLines(done)
t.readLines(pos, done)
cancel()
}()

Expand All @@ -176,21 +176,21 @@ func (t *tailer) Run(ctx context.Context) {
t.stop(done)
}

func (t *tailer) initRun() error {
func (t *tailer) initRun() (int64, error) {
fi, err := os.Stat(t.key.Path)
if err != nil {
return fmt.Errorf("failed to tail file: %w", err)
return 0, fmt.Errorf("failed to tail file: %w", err)
}

pos, err := t.positions.Get(t.key.Path, t.key.Labels)
if err != nil {
switch t.onPositionsFileError {
case OnPositionsFileErrorSkip:
return fmt.Errorf("failed to get file position: %w", err)
return 0, fmt.Errorf("failed to get file position: %w", err)
case OnPositionsFileErrorRestartEnd:
pos, err = getLastLinePosition(t.key.Path)
if err != nil {
return fmt.Errorf("failed to get last line position after positions error: %w", err)
return 0, fmt.Errorf("failed to get last line position after positions error: %w", err)
}
level.Info(t.logger).Log("msg", "retrieved the position of the last line after positions error")
default:
Expand All @@ -207,7 +207,7 @@ func (t *tailer) initRun() error {
if pos == 0 && t.legacyPositionUsed {
pos, err = t.positions.Get(t.key.Path, "{}")
if err != nil {
return fmt.Errorf("failed to get file position with empty labels: %w", err)
return 0, fmt.Errorf("failed to get file position with empty labels: %w", err)
}
}

Expand Down Expand Up @@ -240,19 +240,19 @@ func (t *tailer) initRun() error {
})

if err != nil {
return fmt.Errorf("failed to tail the file: %w", err)
return pos, fmt.Errorf("failed to tail the file: %w", err)
}

t.file = tail

return nil
return pos, nil
}

// readLines reads lines from the tailed file by calling Next() in a loop.
// It processes each line by sending it to the receiver's channel and updates
// position tracking periodically. It exits when Next() returns an error,
// this happens when the tail.File is stopped or or we have a unrecoverable error.
func (t *tailer) readLines(done chan struct{}) {
func (t *tailer) readLines(pos int64, done chan struct{}) {
level.Info(t.logger).Log("msg", "tail routine started")

if t.decompression.Enabled && t.decompression.InitialDelay > 0 {
Expand All @@ -261,8 +261,8 @@ func (t *tailer) readLines(done chan struct{}) {
}

var (
lastOffset = pos
entries = t.receiver.Chan()
lastOffset = int64(0)
positionInterval = t.positions.SyncPeriod()
lastUpdatedPosition = time.Time{}
)
Expand Down
Loading