Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ Main (unreleased)
### Bugfixes

- Fix `otelcol.receiver.filelog` documentation's default value for `start_at`. (@petewall)

- Fix `mimir.rules.kubernetes` panic on non-leader debug info retrieval (@TheoBrigitte)

- Fix detection of the “streams limit exceeded” error in the Loki client so that metrics are correctly labeled as `ReasonStreamLimited`. (@maratkhv)

- Fix `loki.source.file` race condition that often lead to panic when using `decompression`. (@kalleep)

### Other changes

- Update the zap logging adapter used by `otelcol` components to log arrays and objects. (@dehaansa)
Expand Down
90 changes: 37 additions & 53 deletions internal/component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"compress/bzip2"
"compress/gzip"
"compress/zlib"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -61,12 +62,6 @@ type decompressor struct {
cfg DecompressionConfig

componentStopping func() bool

mut sync.RWMutex
stopping bool
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
posdone chan struct{} // used by the updatePosition method to notify when it stopped
done chan struct{} // used by the readLine method to notify when it stopped
}

func newDecompressor(
Expand Down Expand Up @@ -155,39 +150,42 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade
return reader, nil
}

func (d *decompressor) Run() {
d.mut.Lock()

// Check if the stop function was called between two Run.
if d.stopping {
d.mut.Unlock()
func (d *decompressor) Run(ctx context.Context) {
// Check if context was canceled between two calls to Run.
select {
case <-ctx.Done():
return
default:
}

labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)})
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {}))
defer handler.Stop()
d.posquit = make(chan struct{})
d.posdone = make(chan struct{})
d.done = make(chan struct{})
d.mut.Unlock()

// updatePosition closes the channel t.posdone on exit
go d.updatePosition()

d.metrics.filesActive.Add(1.)

// readLines closes the channels t.done on exit
d.readLines(handler)
done := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
go func() {
// readLines closes done on exit
d.readLines(handler, done)
cancel()
}()

d.running.Store(true)
defer d.running.Store(false)

<-ctx.Done()
d.stop(done)
}

func (d *decompressor) updatePosition() {
func (d *decompressor) updatePosition(posquit chan struct{}) {
positionSyncPeriod := d.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
defer func() {
positionWait.Stop()
level.Info(d.logger).Log("msg", "position timer: exited", "path", d.path)
close(d.posdone)
d.cleanupMetrics()
}()

for {
Expand All @@ -197,7 +195,7 @@ func (d *decompressor) updatePosition() {
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
return
}
case <-d.posquit:
case <-posquit:
return
}
}
Expand All @@ -208,21 +206,28 @@ func (d *decompressor) updatePosition() {
// It first decompresses the file as a whole using a reader and then it will iterate
// over its chunks, separated by '\n'.
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
func (d *decompressor) readLines(handler loki.EntryHandler) {
// done channel is closed when readlines exits.
func (d *decompressor) readLines(handler loki.EntryHandler, done chan struct{}) {
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
d.running.Store(true)

if d.cfg.InitialDelay > 0 {
level.Info(d.logger).Log("msg", "sleeping before starting decompression", "path", d.path, "duration", d.cfg.InitialDelay.String())
time.Sleep(d.cfg.InitialDelay)
}

posquit, posdone := make(chan struct{}), make(chan struct{})
go func() {
d.updatePosition(posquit)
close(posdone)
}()

defer func() {
d.running.Store(false)
d.cleanupMetrics()
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
close(d.done)
close(posquit)
<-posdone
close(done)
}()

entries := handler.Chan()

f, err := os.Open(d.path)
Expand Down Expand Up @@ -309,27 +314,10 @@ func (d *decompressor) markPositionAndSize() error {
return nil
}

func (d *decompressor) Stop() {
d.mut.Lock()
d.stopping = true
defer func() {
d.stopping = false
}()
d.mut.Unlock()

// Shut down the position marker thread
if d.posquit != nil {
close(d.posquit)
<-d.posdone
d.posquit = nil
d.posdone = nil
}

func (d *decompressor) stop(done chan struct{}) {
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
if d.done != nil {
<-d.done
d.done = nil
}
<-done

level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)

// If the component is not stopping, then it means that the target for this component is gone and that
Expand Down Expand Up @@ -365,7 +353,3 @@ func (d *decompressor) cleanupMetrics() {
d.metrics.readBytes.DeleteLabelValues(d.path)
d.metrics.totalBytes.DeleteLabelValues(d.path)
}

func (d *decompressor) Path() string {
return d.path
}
Loading