99 "compress/bzip2"
1010 "compress/gzip"
1111 "compress/zlib"
12+ "context"
1213 "fmt"
1314 "io"
1415 "os"
@@ -61,12 +62,6 @@ type decompressor struct {
6162 cfg DecompressionConfig
6263
6364 componentStopping func () bool
64-
65- mut sync.RWMutex
66- stopping bool
67- posquit chan struct {} // used by the readLine method to tell the updatePosition method to stop
68- posdone chan struct {} // used by the updatePosition method to notify when it stopped
69- done chan struct {} // used by the readLine method to notify when it stopped
7065}
7166
7267func newDecompressor (
@@ -155,39 +150,42 @@ func mountReader(f *os.File, logger log.Logger, format CompressionFormat) (reade
155150 return reader , nil
156151}
157152
158- func (d * decompressor ) Run () {
159- d .mut .Lock ()
160-
161- // Check if the stop function was called between two Run.
162- if d .stopping {
163- d .mut .Unlock ()
153+ func (d * decompressor ) Run (ctx context.Context ) {
154+ // Check if context was canceled between two calls to Run.
155+ select {
156+ case <- ctx .Done ():
164157 return
158+ default :
165159 }
166160
167161 labelsMiddleware := d .labels .Merge (model.LabelSet {filenameLabel : model .LabelValue (d .path )})
168162 handler := loki .AddLabelsMiddleware (labelsMiddleware ).Wrap (loki .NewEntryHandler (d .receiver .Chan (), func () {}))
169163 defer handler .Stop ()
170- d .posquit = make (chan struct {})
171- d .posdone = make (chan struct {})
172- d .done = make (chan struct {})
173- d .mut .Unlock ()
174-
175- // updatePosition closes the channel t.posdone on exit
176- go d .updatePosition ()
177164
178165 d .metrics .filesActive .Add (1. )
179166
180- // readLines closes the channels t.done on exit
181- d .readLines (handler )
167+ done := make (chan struct {})
168+ ctx , cancel := context .WithCancel (ctx )
169+ go func () {
170+ // readLines closes done on exit
171+ d .readLines (handler , done )
172+ cancel ()
173+ }()
174+
175+ d .running .Store (true )
176+ defer d .running .Store (false )
177+
178+ <- ctx .Done ()
179+ d .stop (done )
182180}
183181
184- func (d * decompressor ) updatePosition () {
182+ func (d * decompressor ) updatePosition (posquit chan struct {} ) {
185183 positionSyncPeriod := d .positions .SyncPeriod ()
186184 positionWait := time .NewTicker (positionSyncPeriod )
187185 defer func () {
188186 positionWait .Stop ()
189187 level .Info (d .logger ).Log ("msg" , "position timer: exited" , "path" , d .path )
190- close ( d . posdone )
188+ d . cleanupMetrics ( )
191189 }()
192190
193191 for {
@@ -197,7 +195,7 @@ func (d *decompressor) updatePosition() {
197195 level .Error (d .logger ).Log ("msg" , "position timer: error getting position and/or size, stopping decompressor" , "path" , d .path , "error" , err )
198196 return
199197 }
200- case <- d . posquit :
198+ case <- posquit :
201199 return
202200 }
203201 }
@@ -208,21 +206,28 @@ func (d *decompressor) updatePosition() {
208206// It first decompresses the file as a whole using a reader and then it will iterate
209207// over its chunks, separated by '\n'.
210208// During each iteration, the parsed and decoded log line is then sent to the API with the current timestamp.
211- func (d * decompressor ) readLines (handler loki.EntryHandler ) {
209+ // done channel is closed when readlines exits.
210+ func (d * decompressor ) readLines (handler loki.EntryHandler , done chan struct {}) {
212211 level .Info (d .logger ).Log ("msg" , "read lines routine: started" , "path" , d .path )
213- d .running .Store (true )
214212
215213 if d .cfg .InitialDelay > 0 {
216214 level .Info (d .logger ).Log ("msg" , "sleeping before starting decompression" , "path" , d .path , "duration" , d .cfg .InitialDelay .String ())
217215 time .Sleep (d .cfg .InitialDelay )
218216 }
219217
218+ posquit , posdone := make (chan struct {}), make (chan struct {})
219+ go func () {
220+ d .updatePosition (posquit )
221+ close (posdone )
222+ }()
223+
220224 defer func () {
221- d .running .Store (false )
222- d .cleanupMetrics ()
223225 level .Info (d .logger ).Log ("msg" , "read lines routine finished" , "path" , d .path )
224- close (d .done )
226+ close (posquit )
227+ <- posdone
228+ close (done )
225229 }()
230+
226231 entries := handler .Chan ()
227232
228233 f , err := os .Open (d .path )
@@ -309,27 +314,10 @@ func (d *decompressor) markPositionAndSize() error {
309314 return nil
310315}
311316
312- func (d * decompressor ) Stop () {
313- d .mut .Lock ()
314- d .stopping = true
315- defer func () {
316- d .stopping = false
317- }()
318- d .mut .Unlock ()
319-
320- // Shut down the position marker thread
321- if d .posquit != nil {
322- close (d .posquit )
323- <- d .posdone
324- d .posquit = nil
325- d .posdone = nil
326- }
327-
317+ func (d * decompressor ) stop (done chan struct {}) {
328318 // Wait for readLines() to consume all the remaining messages and exit when the channel is closed
329- if d .done != nil {
330- <- d .done
331- d .done = nil
332- }
319+ <- done
320+
333321 level .Info (d .logger ).Log ("msg" , "stopped decompressor" , "path" , d .path )
334322
335323 // If the component is not stopping, then it means that the target for this component is gone and that
@@ -365,7 +353,3 @@ func (d *decompressor) cleanupMetrics() {
365353 d .metrics .readBytes .DeleteLabelValues (d .path )
366354 d .metrics .totalBytes .DeleteLabelValues (d .path )
367355}
368-
369- func (d * decompressor ) Path () string {
370- return d .path
371- }
0 commit comments