Skip to content
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Main (unreleased)

- Add json format support for log export via faro receiver (@ravishankar15)

### Bugfixes

- Fix log rotation for Windows in `loki.source.file` by refactoring the component to use the runner pkg. This should also reduce CPU consumption when tailing a lot of files in a dynamic environment. (@wildum)

- Add livedebugging support for `prometheus.remote_write` (@ravishankar15)

v1.6.0
Expand Down
143 changes: 94 additions & 49 deletions internal/component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package file

// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// This code is adapted from loki/promtail. Last revision used to port changes to Alloy was a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// Decompressor implements the reader interface and is used to read compressed log files.
// It uses the Go stdlib's compress/* packages for decoding.

Expand Down Expand Up @@ -43,41 +43,49 @@ func supportedCompressedFormats() map[string]struct{} {
type decompressor struct {
metrics *metrics
logger log.Logger
handler loki.EntryHandler
receiver loki.LogsReceiver
positions positions.Positions

path string
labels string
path string
labels model.LabelSet
labelsStr string

posAndSizeMtx sync.Mutex
stopOnce sync.Once
posAndSizeMtx sync.RWMutex

running *atomic.Bool
posquit chan struct{}
posdone chan struct{}
done chan struct{}

decoder *encoding.Decoder

position int64
size int64
cfg DecompressionConfig

componentStopping func() bool

mut sync.RWMutex
stopping bool
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
posdone chan struct{} // used by the updatePosition method to notify when it stopped
done chan struct{} // used by the readLine method to notify when it stopped
}

func newDecompressor(
metrics *metrics,
logger log.Logger,
handler loki.EntryHandler,
receiver loki.LogsReceiver,
positions positions.Positions,
path string,
labels string,
labels model.LabelSet,
encodingFormat string,
cfg DecompressionConfig,
componentStopping func() bool,
) (*decompressor, error) {

labelsStr := labels.String()

logger = log.With(logger, "component", "decompressor")

pos, err := positions.Get(path, labels)
pos, err := positions.Get(path, labelsStr)
if err != nil {
return nil, fmt.Errorf("failed to get positions: %w", err)
}
Expand All @@ -93,24 +101,23 @@ func newDecompressor(
}

decompressor := &decompressor{
metrics: metrics,
logger: logger,
handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler),
positions: positions,
path: path,
labels: labels,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
position: pos,
decoder: decoder,
cfg: cfg,
metrics: metrics,
logger: logger,
receiver: receiver,
positions: positions,
path: path,
labels: labels,
labelsStr: labelsStr,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
position: pos,
decoder: decoder,
cfg: cfg,
componentStopping: componentStopping,
}

go decompressor.readLines()
go decompressor.updatePosition()
metrics.filesActive.Add(1.)
return decompressor, nil
}

Expand Down Expand Up @@ -151,6 +158,30 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade
return reader, nil
}

func (d *decompressor) Run() {
d.mut.Lock()

// Check if the stop function was called between two Run.
if d.stopping {
close(d.done)
close(d.posdone)
d.mut.Unlock()
return
}

labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)})
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {}))
defer handler.Stop()
d.posquit = make(chan struct{})
d.posdone = make(chan struct{})
d.done = make(chan struct{})
d.mut.Unlock()

go d.updatePosition()
d.metrics.filesActive.Add(1.)
d.readLines(handler)
}

func (d *decompressor) updatePosition() {
positionSyncPeriod := d.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
Expand All @@ -163,7 +194,7 @@ func (d *decompressor) updatePosition() {
for {
select {
case <-positionWait.C:
if err := d.MarkPositionAndSize(); err != nil {
if err := d.markPositionAndSize(); err != nil {
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
return
}
Expand All @@ -178,7 +209,7 @@ func (d *decompressor) updatePosition() {
// It first decompresses the file as a whole using a reader and then it will iterate
// over its chunks, separated by '\n'.
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
func (d *decompressor) readLines() {
func (d *decompressor) readLines(handler loki.EntryHandler) {
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
d.running.Store(true)

Expand All @@ -188,11 +219,12 @@ func (d *decompressor) readLines() {
}

defer func() {
d.running.Store(false)
d.cleanupMetrics()
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
close(d.done)
}()
entries := d.handler.Chan()
entries := handler.Chan()

f, err := os.Open(d.path)
if err != nil {
Expand Down Expand Up @@ -227,10 +259,13 @@ func (d *decompressor) readLines() {
break
}

d.posAndSizeMtx.RLock()
if line <= d.position {
// skip already seen lines.
d.posAndSizeMtx.RUnlock()
continue
}
d.posAndSizeMtx.RUnlock()

text := scanner.Text()
var finalText string
Expand All @@ -256,41 +291,51 @@ func (d *decompressor) readLines() {
},
}

d.posAndSizeMtx.Lock()
d.size = int64(unsafe.Sizeof(finalText))
d.position++
d.posAndSizeMtx.Unlock()
}
}

func (d *decompressor) MarkPositionAndSize() error {
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
d.posAndSizeMtx.Lock()
defer d.posAndSizeMtx.Unlock()
func (d *decompressor) markPositionAndSize() error {
// Lock this update because it can be called in two different goroutines
d.posAndSizeMtx.RLock()
defer d.posAndSizeMtx.RUnlock()

d.metrics.totalBytes.WithLabelValues(d.path).Set(float64(d.size))
d.metrics.readBytes.WithLabelValues(d.path).Set(float64(d.position))
d.positions.Put(d.path, d.labels, d.position)
d.positions.Put(d.path, d.labelsStr, d.position)

return nil
}

func (d *decompressor) Stop() {
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
// we wrap the stop in a sync.Once.
d.stopOnce.Do(func() {
// Shut down the position marker thread
close(d.posquit)
<-d.posdone
d.mut.Lock()
d.stopping = true
defer func() {
d.stopping = false
}()
d.mut.Unlock()

// Shut down the position marker thread
close(d.posquit)
<-d.posdone

// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-d.done
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)

// If the component is not stopping, then it means that the target for this component is gone and that
// we should clear the entry from the positions file.
if !d.componentStopping() {
d.positions.Remove(d.path, d.labelsStr)
} else {
// Save the current position before shutting down reader
if err := d.MarkPositionAndSize(); err != nil {
if err := d.markPositionAndSize(); err != nil {
level.Error(d.logger).Log("msg", "error marking file position when stopping decompressor", "path", d.path, "error", err)
}

// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-d.done
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)
d.handler.Stop()
})
}
}

func (d *decompressor) IsRunning() bool {
Expand Down
Loading
Loading