Skip to content

Commit a3cd334

Browse files
kalleepTheoBrigittewildummaratkhv
authored
Prepare patch release v1.8.3 on release branch (#3497)
* fix: panic that happens when a target gets deleted when using decompression (#3475) * Fix panic caused that can happen when when file is removed for decompressor * Change to start readLine start and stops updatePositions * Add changelog * Fix mimir.rules.kubernetes panic on non-leader debug info retrieval (#3451) * Fix mimir.rules.kubernetes to only return eventProcessor state if it exists * fix: deadlock in loki.source.file when target is removed (#3488) * Fix deadlock that can happen when stopping reader tasks Co-authored-by: William Dumont <william.dumont@grafana.com> * fix: emit valid logfmt key (#3495) * Fix log keys to be valid for logfmt * Add changelog * Fix streams limit error check so that metrics are correctly labeled as `ReasonStreamLimited` (#3466) * fix: replace direct error string compare with isErrMaxStreamsLimitExceeded helper * update CHANGELOG * Make errMaxStreamsLimitExceeded an error type --------- Co-authored-by: Théo Brigitte <theo.brigitte@gmail.com> Co-authored-by: William Dumont <william.dumont@grafana.com> Co-authored-by: Marat Khvostov <marathvostov@gmail.com>
1 parent 358b59d commit a3cd334

File tree

17 files changed

+350
-217
lines changed

17 files changed

+350
-217
lines changed

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,21 @@ This document contains a historical list of changes between releases. Only
77
changes that impact end-user behavior are listed; changes to documentation or
88
internal API changes are not present.
99

10+
v1.8.3
11+
-----------------
12+
13+
### Bugfixes
14+
15+
- Fix `mimir.rules.kubernetes` panic on non-leader debug info retrieval (@TheoBrigitte)
16+
17+
- Fix detection of the “streams limit exceeded” error in the Loki client so that metrics are correctly labeled as `ReasonStreamLimited`. (@maratkhv)
18+
19+
- Fix `loki.source.file` race condition that often lead to panic when using `decompression`. (@kalleep)
20+
21+
- Fix deadlock in `loki.source.file` that can happen when targets are removed. (@kalleep)
22+
23+
- Fix `loki.process` to emit valid logfmt. (@kalleep)
24+
1025
v1.8.2
1126
-----------------
1227

internal/component/common/loki/client/batch.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"errors"
45
"fmt"
56
"strconv"
67
"strings"
@@ -16,8 +17,8 @@ import (
1617
"github.com/grafana/alloy/internal/component/common/loki"
1718
)
1819

19-
const (
20-
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
20+
var (
21+
errMaxStreamsLimitExceeded = errors.New("streams limit exceeded")
2122
)
2223

2324
// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that
@@ -73,7 +74,7 @@ func (b *batch) add(entry loki.Entry) error {
7374

7475
streams := len(b.streams)
7576
if b.maxStreams > 0 && streams >= b.maxStreams {
76-
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
77+
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
7778
}
7879
// Add the entry as a new stream
7980
b.streams[labels] = &logproto.Stream{
@@ -98,7 +99,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum
9899

99100
streams := len(b.streams)
100101
if b.maxStreams > 0 && streams >= b.maxStreams {
101-
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
102+
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
102103
}
103104

104105
// Add the entry as a new stream

internal/component/common/loki/client/batch_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestBatch_MaxStreams(t *testing.T) {
3131
err := b.add(entry)
3232
if err != nil {
3333
errCount++
34-
assert.EqualError(t, err, fmt.Errorf(errMaxStreamsLimitExceeded, len(b.streams), b.maxStreams, entry.Labels).Error())
34+
assert.ErrorIs(t, err, errMaxStreamsLimitExceeded)
3535
}
3636
}
3737
assert.Equal(t, errCount, 2)

internal/component/common/loki/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (c *client) run() {
310310
if err != nil {
311311
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
312312
reason := ReasonGeneric
313-
if err.Error() == errMaxStreamsLimitExceeded {
313+
if errors.Is(err, errMaxStreamsLimitExceeded) {
314314
reason = ReasonStreamLimited
315315
}
316316
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))

internal/component/common/loki/client/queue_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func (c *queueClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e lo
358358
if err != nil {
359359
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
360360
reason := ReasonGeneric
361-
if err.Error() == errMaxStreamsLimitExceeded {
361+
if errors.Is(err, errMaxStreamsLimitExceeded) {
362362
reason = ReasonStreamLimited
363363
}
364364
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))

internal/component/loki/process/stages/eventlogmessage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (m *eventLogMessageStage) processEntry(extracted map[string]interface{}, ke
107107
}
108108
if Debug {
109109
level.Debug(m.logger).Log("msg", "extracted data debug in event_log_message stage",
110-
"extracted data", fmt.Sprintf("%v", extracted))
110+
"extracted_data", fmt.Sprintf("%v", extracted))
111111
}
112112
return nil
113113
}

internal/component/loki/process/stages/json.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
165165
}
166166
}
167167
if Debug {
168-
level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted data", fmt.Sprintf("%v", extracted))
168+
level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted_data", fmt.Sprintf("%v", extracted))
169169
}
170170
return nil
171171
}

internal/component/loki/process/stages/replace.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (r *replaceStage) Process(labels model.LabelSet, extracted map[string]inter
125125
}
126126
}
127127
}
128-
level.Debug(r.logger).Log("msg", "extracted data debug in replace stage", "extracted data", fmt.Sprintf("%v", extracted))
128+
level.Debug(r.logger).Log("msg", "extracted data debug in replace stage", "extracted_data", fmt.Sprintf("%v", extracted))
129129
}
130130

131131
func (r *replaceStage) getReplacedEntry(matchAllIndex [][]int, input string, td map[string]string, templ *template.Template) (string, map[string]string, error) {

internal/component/loki/source/file/decompresser.go

Lines changed: 37 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"compress/bzip2"
1010
"compress/gzip"
1111
"compress/zlib"
12+
"context"
1213
"fmt"
1314
"io"
1415
"os"
@@ -61,12 +62,6 @@ type decompressor struct {
6162
cfg DecompressionConfig
6263

6364
componentStopping func() bool
64-
65-
mut sync.RWMutex
66-
stopping bool
67-
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
68-
posdone chan struct{} // used by the updatePosition method to notify when it stopped
69-
done chan struct{} // used by the readLine method to notify when it stopped
7065
}
7166

7267
func newDecompressor(
@@ -155,39 +150,42 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade
155150
return reader, nil
156151
}
157152

158-
func (d *decompressor) Run() {
159-
d.mut.Lock()
160-
161-
// Check if the stop function was called between two Run.
162-
if d.stopping {
163-
d.mut.Unlock()
153+
func (d *decompressor) Run(ctx context.Context) {
154+
// Check if context was canceled between two calls to Run.
155+
select {
156+
case <-ctx.Done():
164157
return
158+
default:
165159
}
166160

167161
labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)})
168162
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {}))
169163
defer handler.Stop()
170-
d.posquit = make(chan struct{})
171-
d.posdone = make(chan struct{})
172-
d.done = make(chan struct{})
173-
d.mut.Unlock()
174-
175-
// updatePosition closes the channel t.posdone on exit
176-
go d.updatePosition()
177164

178165
d.metrics.filesActive.Add(1.)
179166

180-
// readLines closes the channels t.done on exit
181-
d.readLines(handler)
167+
done := make(chan struct{})
168+
ctx, cancel := context.WithCancel(ctx)
169+
go func() {
170+
// readLines closes done on exit
171+
d.readLines(handler, done)
172+
cancel()
173+
}()
174+
175+
d.running.Store(true)
176+
defer d.running.Store(false)
177+
178+
<-ctx.Done()
179+
d.stop(done)
182180
}
183181

184-
func (d *decompressor) updatePosition() {
182+
func (d *decompressor) updatePosition(posquit chan struct{}) {
185183
positionSyncPeriod := d.positions.SyncPeriod()
186184
positionWait := time.NewTicker(positionSyncPeriod)
187185
defer func() {
188186
positionWait.Stop()
189187
level.Info(d.logger).Log("msg", "position timer: exited", "path", d.path)
190-
close(d.posdone)
188+
d.cleanupMetrics()
191189
}()
192190

193191
for {
@@ -197,7 +195,7 @@ func (d *decompressor) updatePosition() {
197195
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
198196
return
199197
}
200-
case <-d.posquit:
198+
case <-posquit:
201199
return
202200
}
203201
}
@@ -208,21 +206,28 @@ func (d *decompressor) updatePosition() {
208206
// It first decompresses the file as a whole using a reader and then it will iterate
209207
// over its chunks, separated by '\n'.
210208
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
211-
func (d *decompressor) readLines(handler loki.EntryHandler) {
209+
// done channel is closed when readlines exits.
210+
func (d *decompressor) readLines(handler loki.EntryHandler, done chan struct{}) {
212211
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
213-
d.running.Store(true)
214212

215213
if d.cfg.InitialDelay > 0 {
216214
level.Info(d.logger).Log("msg", "sleeping before starting decompression", "path", d.path, "duration", d.cfg.InitialDelay.String())
217215
time.Sleep(d.cfg.InitialDelay)
218216
}
219217

218+
posquit, posdone := make(chan struct{}), make(chan struct{})
219+
go func() {
220+
d.updatePosition(posquit)
221+
close(posdone)
222+
}()
223+
220224
defer func() {
221-
d.running.Store(false)
222-
d.cleanupMetrics()
223225
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
224-
close(d.done)
226+
close(posquit)
227+
<-posdone
228+
close(done)
225229
}()
230+
226231
entries := handler.Chan()
227232

228233
f, err := os.Open(d.path)
@@ -309,27 +314,10 @@ func (d *decompressor) markPositionAndSize() error {
309314
return nil
310315
}
311316

312-
func (d *decompressor) Stop() {
313-
d.mut.Lock()
314-
d.stopping = true
315-
defer func() {
316-
d.stopping = false
317-
}()
318-
d.mut.Unlock()
319-
320-
// Shut down the position marker thread
321-
if d.posquit != nil {
322-
close(d.posquit)
323-
<-d.posdone
324-
d.posquit = nil
325-
d.posdone = nil
326-
}
327-
317+
func (d *decompressor) stop(done chan struct{}) {
328318
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
329-
if d.done != nil {
330-
<-d.done
331-
d.done = nil
332-
}
319+
<-done
320+
333321
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)
334322

335323
// If the component is not stopping, then it means that the target for this component is gone and that
@@ -365,7 +353,3 @@ func (d *decompressor) cleanupMetrics() {
365353
d.metrics.readBytes.DeleteLabelValues(d.path)
366354
d.metrics.totalBytes.DeleteLabelValues(d.path)
367355
}
368-
369-
func (d *decompressor) Path() string {
370-
return d.path
371-
}

0 commit comments

Comments
 (0)