Skip to content

Commit 9505e2a

Browse files
kalleepwildum
andcommitted
fix: deadlock in loki.source.file when target is removed (#3488)
* Fix deadlock that can happen when stopping reader tasks Co-authored-by: William Dumont <william.dumont@grafana.com>
1 parent 40a725a commit 9505e2a

File tree

3 files changed

+113
-5
lines changed

3 files changed

+113
-5
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,17 @@ v1.8.3
1616

1717
- Fix `loki.source.file` race condition that often lead to panic when using `decompression`. (@kalleep)
1818

19+
<<<<<<< HEAD
20+
=======
21+
- Fix deadlock in `loki.source.file` that can happen when targets are removed. (@kalleep)
22+
23+
### Other changes
24+
25+
- Update the zap logging adapter used by `otelcol` components to log arrays and objects. (@dehaansa)
26+
27+
- Updated Windows install script to add DisplayVersion into registry on install (@enessene)
28+
29+
>>>>>>> 4b788736f (fix: deadlock in loki.source.file when target is removed (#3488))
1930
v1.8.2
2031
-----------------
2132

internal/component/loki/source/file/file.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ type Component struct {
8181
opts component.Options
8282
metrics *metrics
8383

84-
updateMut sync.Mutex
85-
8684
mut sync.RWMutex
8785
args Arguments
8886
handler loki.LogsReceiver
@@ -164,12 +162,34 @@ func (c *Component) Run(ctx context.Context) error {
164162
c.mut.RUnlock()
165163
case <-c.updateReaders:
166164
c.mut.Lock()
165+
166+
// When we are updating tasks we need to continue to read from handler.Chan().
167+
// This is done to avoid a race condition where stopping a reader is
168+
// flushing its data, but nothing is reading from handler.Chan().
169+
readCtx, cancel := context.WithCancel(ctx)
170+
go func() {
171+
for {
172+
select {
173+
case entry := <-c.handler.Chan():
174+
for _, receiver := range c.receivers {
175+
receiver.Chan() <- entry
176+
}
177+
case <-readCtx.Done():
178+
return
179+
}
180+
}
181+
}()
182+
167183
var tasks []*runnerTask
168184
level.Debug(c.opts.Logger).Log("msg", "updating tasks", "tasks", len(c.tasks))
169185
for _, entry := range c.tasks {
170186
tasks = append(tasks, &entry)
171187
}
172188
err := runner.ApplyTasks(ctx, tasks)
189+
190+
// We cancel readCtx because we are done updating tasks and the main loop will continue to
191+
// read from it.
192+
cancel()
173193
level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers()))
174194
c.mut.Unlock()
175195

@@ -182,9 +202,6 @@ func (c *Component) Run(ctx context.Context) error {
182202

183203
// Update implements component.Component.
184204
func (c *Component) Update(args component.Arguments) error {
185-
c.updateMut.Lock()
186-
defer c.updateMut.Unlock()
187-
188205
newArgs := args.(Arguments)
189206

190207
c.mut.Lock()

internal/component/loki/source/file/file_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"log"
99
"os"
1010
"path/filepath"
11+
"sync"
1112
"testing"
1213
"time"
1314

@@ -78,6 +79,85 @@ func Test(t *testing.T) {
7879
}
7980
}
8081

82+
func TestUpdateRemoveFileWhileReading(t *testing.T) {
83+
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
84+
85+
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
86+
defer cancel()
87+
88+
// Create file to log to.
89+
f, err := os.CreateTemp(t.TempDir(), "example")
90+
require.NoError(t, err)
91+
defer f.Close()
92+
93+
ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
94+
require.NoError(t, err)
95+
96+
ch1 := loki.NewLogsReceiver()
97+
98+
go func() {
99+
err := ctrl.Run(ctx, Arguments{
100+
Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{
101+
"__path__": f.Name(),
102+
"foo": "bar",
103+
})},
104+
ForwardTo: []loki.LogsReceiver{ch1},
105+
})
106+
require.NoError(t, err)
107+
}()
108+
109+
ctrl.WaitRunning(time.Minute)
110+
111+
workerCtx, cancelWorkers := context.WithCancel(ctx)
112+
var wg sync.WaitGroup
113+
wg.Add(2)
114+
115+
// Start a goroutine that reads from the channel until cancellation
116+
go func() {
117+
defer wg.Done()
118+
for {
119+
select {
120+
case <-workerCtx.Done():
121+
return
122+
case <-ch1.Chan():
123+
// Just consume the messages
124+
}
125+
}
126+
}()
127+
128+
go func() {
129+
defer wg.Done()
130+
for {
131+
select {
132+
case <-workerCtx.Done():
133+
return
134+
default:
135+
_, err = f.Write([]byte("writing some text\nwriting some text2\n"))
136+
require.NoError(t, err)
137+
}
138+
}
139+
}()
140+
141+
time.Sleep(100 * time.Millisecond)
142+
143+
err = ctrl.Update(Arguments{
144+
Targets: []discovery.Target{},
145+
ForwardTo: []loki.LogsReceiver{ch1},
146+
})
147+
require.NoError(t, err)
148+
149+
time.Sleep(100 * time.Millisecond)
150+
151+
err = ctrl.Update(Arguments{
152+
Targets: []discovery.Target{},
153+
ForwardTo: []loki.LogsReceiver{ch1},
154+
})
155+
require.NoError(t, err)
156+
157+
cancelWorkers()
158+
wg.Wait()
159+
}
160+
81161
func TestFileWatch(t *testing.T) {
82162
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
83163
ctx, cancel := context.WithCancel(componenttest.TestContext(t))

0 commit comments

Comments
 (0)