Skip to content

Commit 6fc45ad

Browse files
kalleepmarctc
authored andcommitted
fix: panic that happens when a target gets deleted when using decompression (#3475)
* Fix panic caused that can happen when when file is removed for decompressor * Change to start readLine start and stops updatePositions * Add changelog
1 parent 7da0edf commit 6fc45ad

File tree

7 files changed

+221
-202
lines changed

7 files changed

+221
-202
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,13 @@ Main (unreleased)
4646
### Bugfixes
4747

4848
- Fix `otelcol.receiver.filelog` documentation's default value for `start_at`. (@petewall)
49+
4950
- Fix `mimir.rules.kubernetes` panic on non-leader debug info retrieval (@TheoBrigitte)
5051

5152
- Fix detection of the “streams limit exceeded” error in the Loki client so that metrics are correctly labeled as `ReasonStreamLimited`. (@maratkhv)
5253

54+
- Fix `loki.source.file` race condition that often lead to panic when using `decompression`. (@kalleep)
55+
5356
### Other changes
5457

5558
- Update the zap logging adapter used by `otelcol` components to log arrays and objects. (@dehaansa)

internal/component/loki/source/file/decompresser.go

Lines changed: 37 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"compress/bzip2"
1010
"compress/gzip"
1111
"compress/zlib"
12+
"context"
1213
"fmt"
1314
"io"
1415
"os"
@@ -61,12 +62,6 @@ type decompressor struct {
6162
cfg DecompressionConfig
6263

6364
componentStopping func() bool
64-
65-
mut sync.RWMutex
66-
stopping bool
67-
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
68-
posdone chan struct{} // used by the updatePosition method to notify when it stopped
69-
done chan struct{} // used by the readLine method to notify when it stopped
7065
}
7166

7267
func newDecompressor(
@@ -155,39 +150,42 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade
155150
return reader, nil
156151
}
157152

158-
func (d *decompressor) Run() {
159-
d.mut.Lock()
160-
161-
// Check if the stop function was called between two Run.
162-
if d.stopping {
163-
d.mut.Unlock()
153+
func (d *decompressor) Run(ctx context.Context) {
154+
// Check if context was canceled between two calls to Run.
155+
select {
156+
case <-ctx.Done():
164157
return
158+
default:
165159
}
166160

167161
labelsMiddleware := d.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(d.path)})
168162
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {}))
169163
defer handler.Stop()
170-
d.posquit = make(chan struct{})
171-
d.posdone = make(chan struct{})
172-
d.done = make(chan struct{})
173-
d.mut.Unlock()
174-
175-
// updatePosition closes the channel t.posdone on exit
176-
go d.updatePosition()
177164

178165
d.metrics.filesActive.Add(1.)
179166

180-
// readLines closes the channels t.done on exit
181-
d.readLines(handler)
167+
done := make(chan struct{})
168+
ctx, cancel := context.WithCancel(ctx)
169+
go func() {
170+
// readLines closes done on exit
171+
d.readLines(handler, done)
172+
cancel()
173+
}()
174+
175+
d.running.Store(true)
176+
defer d.running.Store(false)
177+
178+
<-ctx.Done()
179+
d.stop(done)
182180
}
183181

184-
func (d *decompressor) updatePosition() {
182+
func (d *decompressor) updatePosition(posquit chan struct{}) {
185183
positionSyncPeriod := d.positions.SyncPeriod()
186184
positionWait := time.NewTicker(positionSyncPeriod)
187185
defer func() {
188186
positionWait.Stop()
189187
level.Info(d.logger).Log("msg", "position timer: exited", "path", d.path)
190-
close(d.posdone)
188+
d.cleanupMetrics()
191189
}()
192190

193191
for {
@@ -197,7 +195,7 @@ func (d *decompressor) updatePosition() {
197195
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
198196
return
199197
}
200-
case <-d.posquit:
198+
case <-posquit:
201199
return
202200
}
203201
}
@@ -208,21 +206,28 @@ func (d *decompressor) updatePosition() {
208206
// It first decompresses the file as a whole using a reader and then it will iterate
209207
// over its chunks, separated by '\n'.
210208
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
211-
func (d *decompressor) readLines(handler loki.EntryHandler) {
209+
// done channel is closed when readlines exits.
210+
func (d *decompressor) readLines(handler loki.EntryHandler, done chan struct{}) {
212211
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
213-
d.running.Store(true)
214212

215213
if d.cfg.InitialDelay > 0 {
216214
level.Info(d.logger).Log("msg", "sleeping before starting decompression", "path", d.path, "duration", d.cfg.InitialDelay.String())
217215
time.Sleep(d.cfg.InitialDelay)
218216
}
219217

218+
posquit, posdone := make(chan struct{}), make(chan struct{})
219+
go func() {
220+
d.updatePosition(posquit)
221+
close(posdone)
222+
}()
223+
220224
defer func() {
221-
d.running.Store(false)
222-
d.cleanupMetrics()
223225
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
224-
close(d.done)
226+
close(posquit)
227+
<-posdone
228+
close(done)
225229
}()
230+
226231
entries := handler.Chan()
227232

228233
f, err := os.Open(d.path)
@@ -309,27 +314,10 @@ func (d *decompressor) markPositionAndSize() error {
309314
return nil
310315
}
311316

312-
func (d *decompressor) Stop() {
313-
d.mut.Lock()
314-
d.stopping = true
315-
defer func() {
316-
d.stopping = false
317-
}()
318-
d.mut.Unlock()
319-
320-
// Shut down the position marker thread
321-
if d.posquit != nil {
322-
close(d.posquit)
323-
<-d.posdone
324-
d.posquit = nil
325-
d.posdone = nil
326-
}
327-
317+
func (d *decompressor) stop(done chan struct{}) {
328318
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
329-
if d.done != nil {
330-
<-d.done
331-
d.done = nil
332-
}
319+
<-done
320+
333321
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)
334322

335323
// If the component is not stopping, then it means that the target for this component is gone and that
@@ -365,7 +353,3 @@ func (d *decompressor) cleanupMetrics() {
365353
d.metrics.readBytes.DeleteLabelValues(d.path)
366354
d.metrics.totalBytes.DeleteLabelValues(d.path)
367355
}
368-
369-
func (d *decompressor) Path() string {
370-
return d.path
371-
}

0 commit comments

Comments
 (0)