Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ This document contains a historical list of changes between releases. Only
changes that impact end-user behavior are listed; changes to documentation or
internal API changes are not present.

v1.8.3
-----------------

### Bugfixes

- Fix `mimir.rules.kubernetes` panic on non-leader debug info retrieval (@TheoBrigitte)

- Fix detection of the “streams limit exceeded” error in the Loki client so that metrics are correctly labeled as `ReasonStreamLimited`. (@maratkhv)

- Fix `loki.source.file` race condition that often lead to panic when using `decompression`. (@kalleep)

- Fix deadlock in `loki.source.file` that can happen when targets are removed. (@kalleep)

- Fix `loki.process` to emit valid logfmt. (@kalleep)

v1.8.2
-----------------

Expand Down
9 changes: 5 additions & 4 deletions internal/component/common/loki/client/batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -16,8 +17,8 @@ import (
"github.com/grafana/alloy/internal/component/common/loki"
)

const (
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
var (
errMaxStreamsLimitExceeded = errors.New("streams limit exceeded")
)

// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that
Expand Down Expand Up @@ -73,7 +74,7 @@ func (b *batch) add(entry loki.Entry) error {

streams := len(b.streams)
if b.maxStreams > 0 && streams >= b.maxStreams {
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
}
// Add the entry as a new stream
b.streams[labels] = &logproto.Stream{
Expand All @@ -98,7 +99,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum

streams := len(b.streams)
if b.maxStreams > 0 && streams >= b.maxStreams {
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
}

// Add the entry as a new stream
Expand Down
2 changes: 1 addition & 1 deletion internal/component/common/loki/client/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestBatch_MaxStreams(t *testing.T) {
err := b.add(entry)
if err != nil {
errCount++
assert.EqualError(t, err, fmt.Errorf(errMaxStreamsLimitExceeded, len(b.streams), b.maxStreams, entry.Labels).Error())
assert.ErrorIs(t, err, errMaxStreamsLimitExceeded)
}
}
assert.Equal(t, errCount, 2)
Expand Down
2 changes: 1 addition & 1 deletion internal/component/common/loki/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (c *client) run() {
if err != nil {
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
reason := ReasonGeneric
if err.Error() == errMaxStreamsLimitExceeded {
if errors.Is(err, errMaxStreamsLimitExceeded) {
reason = ReasonStreamLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))
Expand Down
2 changes: 1 addition & 1 deletion internal/component/common/loki/client/queue_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (c *queueClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e lo
if err != nil {
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
reason := ReasonGeneric
if err.Error() == errMaxStreamsLimitExceeded {
if errors.Is(err, errMaxStreamsLimitExceeded) {
reason = ReasonStreamLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (m *eventLogMessageStage) processEntry(extracted map[string]interface{}, ke
}
if Debug {
level.Debug(m.logger).Log("msg", "extracted data debug in event_log_message stage",
"extracted data", fmt.Sprintf("%v", extracted))
"extracted_data", fmt.Sprintf("%v", extracted))
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
}
}
if Debug {
level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted data", fmt.Sprintf("%v", extracted))
level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted_data", fmt.Sprintf("%v", extracted))
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/stages/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *replaceStage) Process(labels model.LabelSet, extracted map[string]inter
}
}
}
level.Debug(r.logger).Log("msg", "extracted data debug in replace stage", "extracted data", fmt.Sprintf("%v", extracted))
level.Debug(r.logger).Log("msg", "extracted data debug in replace stage", "extracted_data", fmt.Sprintf("%v", extracted))
}

func (r *replaceStage) getReplacedEntry(matchAllIndex [][]int, input string, td map[string]string, templ *template.Template) (string, map[string]string, error) {
Expand Down
90 changes: 37 additions & 53 deletions internal/component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"compress/bzip2"
"compress/gzip"
"compress/zlib"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -61,12 +62,6 @@ type decompressor struct {
cfg DecompressionConfig

componentStopping func() bool

mut sync.RWMutex
stopping bool
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
posdone chan struct{} // used by the updatePosition method to notify when it stopped
done chan struct{} // used by the readLine method to notify when it stopped
}

func newDecompressor(
Expand Down Expand Up @@ -155,39 +150,42 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade
return reader, nil
}

func (d *decompressor) Run() {
d.mut.Lock()

// Check if the stop function was called between two Run.
if d.stopping {
d.mut.Unlock()
func (d *decompressor) Run(ctx context.Context) {
// Check if context was canceled between two calls to Run.
select {
case <-ctx.Done():
return
default:
}

labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)})
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {}))
defer handler.Stop()
d.posquit = make(chan struct{})
d.posdone = make(chan struct{})
d.done = make(chan struct{})
d.mut.Unlock()

// updatePosition closes the channel t.posdone on exit
go d.updatePosition()

d.metrics.filesActive.Add(1.)

// readLines closes the channels t.done on exit
d.readLines(handler)
done := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
go func() {
// readLines closes done on exit
d.readLines(handler, done)
cancel()
}()

d.running.Store(true)
defer d.running.Store(false)

<-ctx.Done()
d.stop(done)
}

func (d *decompressor) updatePosition() {
func (d *decompressor) updatePosition(posquit chan struct{}) {
positionSyncPeriod := d.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
defer func() {
positionWait.Stop()
level.Info(d.logger).Log("msg", "position timer: exited", "path", d.path)
close(d.posdone)
d.cleanupMetrics()
}()

for {
Expand All @@ -197,7 +195,7 @@ func (d *decompressor) updatePosition() {
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
return
}
case <-d.posquit:
case <-posquit:
return
}
}
Expand All @@ -208,21 +206,28 @@ func (d *decompressor) updatePosition() {
// It first decompresses the file as a whole using a reader and then it will iterate
// over its chunks, separated by '\n'.
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
func (d *decompressor) readLines(handler loki.EntryHandler) {
// done channel is closed when readlines exits.
func (d *decompressor) readLines(handler loki.EntryHandler, done chan struct{}) {
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
d.running.Store(true)

if d.cfg.InitialDelay > 0 {
level.Info(d.logger).Log("msg", "sleeping before starting decompression", "path", d.path, "duration", d.cfg.InitialDelay.String())
time.Sleep(d.cfg.InitialDelay)
}

posquit, posdone := make(chan struct{}), make(chan struct{})
go func() {
d.updatePosition(posquit)
close(posdone)
}()

defer func() {
d.running.Store(false)
d.cleanupMetrics()
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
close(d.done)
close(posquit)
<-posdone
close(done)
}()

entries := handler.Chan()

f, err := os.Open(d.path)
Expand Down Expand Up @@ -309,27 +314,10 @@ func (d *decompressor) markPositionAndSize() error {
return nil
}

func (d *decompressor) Stop() {
d.mut.Lock()
d.stopping = true
defer func() {
d.stopping = false
}()
d.mut.Unlock()

// Shut down the position marker thread
if d.posquit != nil {
close(d.posquit)
<-d.posdone
d.posquit = nil
d.posdone = nil
}

func (d *decompressor) stop(done chan struct{}) {
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
if d.done != nil {
<-d.done
d.done = nil
}
<-done

level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)

// If the component is not stopping, then it means that the target for this component is gone and that
Expand Down Expand Up @@ -365,7 +353,3 @@ func (d *decompressor) cleanupMetrics() {
d.metrics.readBytes.DeleteLabelValues(d.path)
d.metrics.totalBytes.DeleteLabelValues(d.path)
}

func (d *decompressor) Path() string {
return d.path
}
Loading
Loading