Skip to content

Commit dd1f2e7

Browse files
committed
Change to start readLine start and stops updatePositions
1 parent fa65606 commit dd1f2e7

File tree

3 files changed

+98
-95
lines changed

3 files changed

+98
-95
lines changed

internal/component/loki/source/file/decompresser.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ type decompressor struct {
6262
cfg DecompressionConfig
6363

6464
componentStopping func() bool
65-
66-
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
67-
posdone chan struct{} // used by the updatePosition method to notify when it stopped
68-
done chan struct{} // used by the readLine method to notify when it stopped
6965
}
7066

7167
func newDecompressor(
@@ -166,38 +162,30 @@ func (d *decompressor) Run(ctx context.Context) {
166162
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(d.receiver.Chan(), func() {}))
167163
defer handler.Stop()
168164

169-
d.posquit = make(chan struct{})
170-
d.posdone = make(chan struct{})
171-
d.done = make(chan struct{})
172-
173165
d.metrics.filesActive.Add(1.)
174-
go func() {
175-
// updatePosition closes the channel t.posdone on exit
176-
d.updatePosition()
177-
}()
178166

167+
done := make(chan struct{})
179168
ctx, cancel := context.WithCancel(ctx)
180169
go func() {
181-
// readLines closes the channels t.done and t.posquit on exit
182-
d.readLines(handler)
170+
// readLines closes the done on exit
171+
d.readLines(handler, done)
183172
cancel()
184173
}()
185174

186175
d.running.Store(true)
187176
defer d.running.Store(false)
188177

189178
<-ctx.Done()
190-
d.stop()
179+
d.stop(done)
191180
}
192181

193-
func (d *decompressor) updatePosition() {
182+
func (d *decompressor) updatePosition(posquit chan struct{}) {
194183
positionSyncPeriod := d.positions.SyncPeriod()
195184
positionWait := time.NewTicker(positionSyncPeriod)
196185
defer func() {
197186
positionWait.Stop()
198187
level.Info(d.logger).Log("msg", "position timer: exited", "path", d.path)
199188
d.cleanupMetrics()
200-
close(d.posdone)
201189
}()
202190

203191
for {
@@ -207,7 +195,7 @@ func (d *decompressor) updatePosition() {
207195
level.Error(d.logger).Log("msg", "position timer: error getting position and/or size, stopping decompressor", "path", d.path, "error", err)
208196
return
209197
}
210-
case <-d.posquit:
198+
case <-posquit:
211199
return
212200
}
213201
}
@@ -218,18 +206,26 @@ func (d *decompressor) updatePosition() {
218206
// It first decompresses the file as a whole using a reader and then it will iterate
219207
// over its chunks, separated by '\n'.
220208
// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
221-
func (d *decompressor) readLines(handler loki.EntryHandler) {
209+
// done channel is closed when readlines exits.
210+
func (d *decompressor) readLines(handler loki.EntryHandler, done chan struct{}) {
222211
level.Info(d.logger).Log("msg", "read lines routine: started", "path", d.path)
223212

224213
if d.cfg.InitialDelay > 0 {
225214
level.Info(d.logger).Log("msg", "sleeping before starting decompression", "path", d.path, "duration", d.cfg.InitialDelay.String())
226215
time.Sleep(d.cfg.InitialDelay)
227216
}
228217

218+
posquit, posdone := make(chan struct{}), make(chan struct{})
219+
go func() {
220+
d.updatePosition(posquit)
221+
close(posdone)
222+
}()
223+
229224
defer func() {
230225
level.Info(d.logger).Log("msg", "read lines routine finished", "path", d.path)
231-
close(d.done)
232-
close(d.posquit)
226+
close(posquit)
227+
<-posdone
228+
close(done)
233229
}()
234230

235231
entries := handler.Chan()
@@ -318,11 +314,9 @@ func (d *decompressor) markPositionAndSize() error {
318314
return nil
319315
}
320316

321-
func (d *decompressor) stop() {
317+
func (d *decompressor) stop(done chan struct{}) {
322318
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
323-
<-d.done
324-
// Wait for the position marker thread to exit
325-
<-d.posdone
319+
<-done
326320

327321
level.Info(d.logger).Log("msg", "stopped decompressor", "path", d.path)
328322

internal/component/loki/source/file/decompresser_test.go

Lines changed: 61 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,24 @@ func newNoopClient() *noopClient {
5050
return c
5151
}
5252

53+
var _ positions.Positions = (*noopPositions)(nil)
54+
55+
type noopPositions struct{}
56+
57+
func (n *noopPositions) Get(path string, labels string) (int64, error) { return 0, nil }
58+
59+
func (n *noopPositions) GetString(path string, labels string) string { return "" }
60+
61+
func (n *noopPositions) Put(path string, labels string, pos int64) {}
62+
63+
func (n *noopPositions) PutString(path string, labels string, pos string) {}
64+
65+
func (n *noopPositions) Remove(path string, labels string) {}
66+
67+
func (n *noopPositions) Stop() {}
68+
69+
func (n *noopPositions) SyncPeriod() time.Duration { return 10 * time.Second }
70+
5371
func BenchmarkReadlines(b *testing.B) {
5472
entryHandler := newNoopClient()
5573

@@ -79,9 +97,9 @@ func BenchmarkReadlines(b *testing.B) {
7997
for i := 0; i < b.N; i++ {
8098
newDec := decBase
8199
newDec.metrics = newMetrics(prometheus.NewRegistry())
82-
newDec.done = make(chan struct{})
83-
newDec.readLines(entryHandler)
84-
<-newDec.done
100+
done := make(chan struct{})
101+
newDec.readLines(entryHandler, done)
102+
<-done
85103
}
86104
})
87105
}
@@ -93,21 +111,20 @@ func TestGigantiqueGunzipFile(t *testing.T) {
93111
defer handler.Stop()
94112

95113
d := &decompressor{
96-
logger: log.NewNopLogger(),
97-
running: atomic.NewBool(false),
98-
receiver: loki.NewLogsReceiver(),
99-
path: file,
100-
done: make(chan struct{}),
101-
posquit: make(chan struct{}),
102-
metrics: newMetrics(prometheus.NewRegistry()),
103-
cfg: DecompressionConfig{Format: "gz"},
114+
logger: log.NewNopLogger(),
115+
running: atomic.NewBool(false),
116+
receiver: loki.NewLogsReceiver(),
117+
path: file,
118+
metrics: newMetrics(prometheus.NewRegistry()),
119+
cfg: DecompressionConfig{Format: "gz"},
120+
positions: &noopPositions{},
104121
}
105122

106-
d.readLines(handler)
123+
done := make(chan struct{})
124+
d.readLines(handler, done)
125+
<-done
107126

108-
<-d.done
109127
time.Sleep(time.Millisecond * 200)
110-
111128
entries := handler.Received()
112129
require.Equal(t, 100000, len(entries))
113130
}
@@ -124,21 +141,20 @@ func TestOnelineFiles(t *testing.T) {
124141
defer handler.Stop()
125142

126143
d := &decompressor{
127-
logger: log.NewNopLogger(),
128-
running: atomic.NewBool(false),
129-
receiver: loki.NewLogsReceiver(),
130-
path: file,
131-
done: make(chan struct{}),
132-
posquit: make(chan struct{}),
133-
metrics: newMetrics(prometheus.NewRegistry()),
134-
cfg: DecompressionConfig{Format: "gz"},
144+
logger: log.NewNopLogger(),
145+
running: atomic.NewBool(false),
146+
receiver: loki.NewLogsReceiver(),
147+
path: file,
148+
metrics: newMetrics(prometheus.NewRegistry()),
149+
cfg: DecompressionConfig{Format: "gz"},
150+
positions: &noopPositions{},
135151
}
136152

137-
d.readLines(handler)
153+
done := make(chan struct{})
154+
d.readLines(handler, done)
155+
<-done
138156

139-
<-d.done
140157
time.Sleep(time.Millisecond * 200)
141-
142158
entries := handler.Received()
143159
require.Equal(t, 1, len(entries))
144160
require.Equal(t, string(fileContent), entries[0].Line)
@@ -150,19 +166,19 @@ func TestOnelineFiles(t *testing.T) {
150166
defer handler.Stop()
151167

152168
d := &decompressor{
153-
logger: log.NewNopLogger(),
154-
running: atomic.NewBool(false),
155-
receiver: loki.NewLogsReceiver(),
156-
path: file,
157-
done: make(chan struct{}),
158-
posquit: make(chan struct{}),
159-
metrics: newMetrics(prometheus.NewRegistry()),
160-
cfg: DecompressionConfig{Format: "bz2"},
169+
logger: log.NewNopLogger(),
170+
running: atomic.NewBool(false),
171+
receiver: loki.NewLogsReceiver(),
172+
path: file,
173+
metrics: newMetrics(prometheus.NewRegistry()),
174+
cfg: DecompressionConfig{Format: "bz2"},
175+
positions: &noopPositions{},
161176
}
162177

163-
d.readLines(handler)
178+
done := make(chan struct{})
179+
d.readLines(handler, done)
180+
<-done
164181

165-
<-d.done
166182
time.Sleep(time.Millisecond * 200)
167183

168184
entries := handler.Received()
@@ -176,19 +192,19 @@ func TestOnelineFiles(t *testing.T) {
176192
defer handler.Stop()
177193

178194
d := &decompressor{
179-
logger: log.NewNopLogger(),
180-
running: atomic.NewBool(false),
181-
receiver: loki.NewLogsReceiver(),
182-
path: file,
183-
done: make(chan struct{}),
184-
posquit: make(chan struct{}),
185-
metrics: newMetrics(prometheus.NewRegistry()),
186-
cfg: DecompressionConfig{Format: "gz"},
195+
logger: log.NewNopLogger(),
196+
running: atomic.NewBool(false),
197+
receiver: loki.NewLogsReceiver(),
198+
path: file,
199+
metrics: newMetrics(prometheus.NewRegistry()),
200+
cfg: DecompressionConfig{Format: "gz"},
201+
positions: &noopPositions{},
187202
}
188203

189-
d.readLines(handler)
204+
done := make(chan struct{})
205+
d.readLines(handler, done)
190206

191-
<-d.done
207+
<-done
192208
time.Sleep(time.Millisecond * 200)
193209

194210
entries := handler.Received()

internal/component/loki/source/file/tailer.go

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ type tailer struct {
4949
componentStopping func() bool
5050

5151
tail *tail.Tail
52-
posquit chan struct{} // used by the readLine method to tell the updatePosition method to stop
53-
posdone chan struct{} // used by the updatePosition method to notify when it stopped
54-
done chan struct{} // used by the readLine method to notify when it stopped
55-
5652
decoder *encoding.Decoder
5753
}
5854

@@ -159,23 +155,19 @@ func (t *tailer) Run(ctx context.Context) {
159155

160156
t.metrics.filesActive.Add(1.)
161157

162-
go func() {
163-
// updatePosition closes the channel t.posdone on exit
164-
t.updatePosition()
165-
}()
166-
158+
done := make(chan struct{})
167159
ctx, cancel := context.WithCancel(ctx)
168160
go func() {
169-
// readLines closes the channels t.done and t.posquit on exit
170-
t.readLines(handler)
161+
// readLines closes done on exit
162+
t.readLines(handler, done)
171163
cancel()
172164
}()
173165

174166
t.running.Store(true)
175167
defer t.running.Store(false)
176168

177169
<-ctx.Done()
178-
t.stop()
170+
t.stop(done)
179171
}
180172

181173
func (t *tailer) initRun() (loki.EntryHandler, error) {
@@ -230,9 +222,6 @@ func (t *tailer) initRun() (loki.EntryHandler, error) {
230222

231223
labelsMiddleware := t.labels.Merge(model.LabelSet{filenameLabel: model.LabelValue(t.path)})
232224
handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(t.receiver.Chan(), func() {}))
233-
t.posquit = make(chan struct{})
234-
t.posdone = make(chan struct{})
235-
t.done = make(chan struct{})
236225

237226
return handler, nil
238227
}
@@ -242,15 +231,14 @@ func (t *tailer) initRun() (loki.EntryHandler, error) {
242231
// an error it stops the tailer and exits, the tailer will be re-opened by the
243232
// backoff retry method if it still exists and will start reading from the
244233
// last successful entry in the positions file.
245-
func (t *tailer) updatePosition() {
234+
func (t *tailer) updatePosition(posquit chan struct{}) {
246235
positionSyncPeriod := t.positions.SyncPeriod()
247236
positionWait := time.NewTicker(positionSyncPeriod)
248237
defer func() {
249238
positionWait.Stop()
250239
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
251240
// NOTE: metrics must be cleaned up after the position timer exits, as markPositionAndSize() updates metrics.
252241
t.cleanupMetrics()
253-
close(t.posdone)
254242
}()
255243

256244
for {
@@ -265,7 +253,7 @@ func (t *tailer) updatePosition() {
265253
}
266254
return
267255
}
268-
case <-t.posquit:
256+
case <-posquit:
269257
return
270258
}
271259
}
@@ -277,16 +265,23 @@ func (t *tailer) updatePosition() {
277265
// there are unread lines in this channel and the Stop method on the tailer is
278266
// called, the underlying tailer will never exit if there are unread lines in
279267
// the t.tail.Lines channel
280-
func (t *tailer) readLines(handler loki.EntryHandler) {
268+
func (t *tailer) readLines(handler loki.EntryHandler, done chan struct{}) {
281269
level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path)
282270

271+
posquit, posdone := make(chan struct{}), make(chan struct{})
272+
go func() {
273+
t.updatePosition(posquit)
274+
close(posdone)
275+
}()
276+
283277
// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
284278
// Clean everything up.
285279
defer func() {
286280
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path)
287-
close(t.done)
288281
// Shut down the position marker thread
289-
close(t.posquit)
282+
close(posquit)
283+
<-posdone
284+
close(done)
290285
}()
291286

292287
entries := handler.Chan()
@@ -355,7 +350,7 @@ func (t *tailer) markPositionAndSize() error {
355350
return nil
356351
}
357352

358-
func (t *tailer) stop() {
353+
func (t *tailer) stop(done chan struct{}) {
359354
// Save the current position before shutting down tailer to ensure that if the file is tailed again
360355
// it start where it left off.
361356
err := t.markPositionAndSize()
@@ -378,9 +373,7 @@ func (t *tailer) stop() {
378373
level.Debug(t.logger).Log("msg", "waiting for readline and position marker to exit", "path", t.path)
379374

380375
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
381-
<-t.done
382-
// Wait for the position marker thread to exit
383-
<-t.posdone
376+
<-done
384377

385378
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
386379

0 commit comments

Comments
 (0)