Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Main (unreleased)

- Fix `loki.source.file` race condition that often lead to panic when using `decompression`. (@kalleep)

- Fix deadlock in `loki.source.file` that can happen when targets are removed. (@kalleep)

### Other changes

- Update the zap logging adapter used by `otelcol` components to log arrays and objects. (@dehaansa)
Expand Down
27 changes: 22 additions & 5 deletions internal/component/loki/source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ type Component struct {
opts component.Options
metrics *metrics

updateMut sync.Mutex
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only used in Update so we don't need it


mut sync.RWMutex
args Arguments
handler loki.LogsReceiver
Expand Down Expand Up @@ -164,12 +162,34 @@ func (c *Component) Run(ctx context.Context) error {
c.mut.RUnlock()
case <-c.updateReaders:
c.mut.Lock()

// When we are updating tasks we need to continue to read from handler.Chan().
// This is done to avoid a race condition where stopping a reader is
// flushing its data, but nothing is reading from handler.Chan().
readCtx, cancel := context.WithCancel(ctx)
go func() {
for {
select {
case entry := <-c.handler.Chan():
for _, receiver := range c.receivers {
receiver.Chan() <- entry
}
case <-readCtx.Done():
return
}
}
}()

var tasks []*runnerTask
level.Debug(c.opts.Logger).Log("msg", "updating tasks", "tasks", len(c.tasks))
for _, entry := range c.tasks {
tasks = append(tasks, &entry)
}
err := runner.ApplyTasks(ctx, tasks)

// We cancel readCtx because we are done updating tasks and the main loop will continue to
// read from it.
cancel()
level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers()))
c.mut.Unlock()

Expand All @@ -182,9 +202,6 @@ func (c *Component) Run(ctx context.Context) error {

// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
c.updateMut.Lock()
defer c.updateMut.Unlock()

newArgs := args.(Arguments)

c.mut.Lock()
Expand Down
80 changes: 80 additions & 0 deletions internal/component/loki/source/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -78,6 +79,85 @@ func Test(t *testing.T) {
}
}

func TestUpdateRemoveFileWhileReading(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

ctx, cancel := context.WithCancel(componenttest.TestContext(t))
defer cancel()

// Create file to log to.
f, err := os.CreateTemp(t.TempDir(), "example")
require.NoError(t, err)
defer f.Close()

ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file")
require.NoError(t, err)

ch1 := loki.NewLogsReceiver()

go func() {
err := ctrl.Run(ctx, Arguments{
Targets: []discovery.Target{discovery.NewTargetFromMap(map[string]string{
"__path__": f.Name(),
"foo": "bar",
})},
ForwardTo: []loki.LogsReceiver{ch1},
})
require.NoError(t, err)
}()

ctrl.WaitRunning(time.Minute)

workerCtx, cancelWorkers := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(2)

// Start a goroutine that reads from the channel until cancellation
go func() {
defer wg.Done()
for {
select {
case <-workerCtx.Done():
return
case <-ch1.Chan():
// Just consume the messages
}
}
}()

go func() {
defer wg.Done()
for {
select {
case <-workerCtx.Done():
return
default:
_, err = f.Write([]byte("writing some text\nwriting some text2\n"))
require.NoError(t, err)
}
}
}()

time.Sleep(100 * time.Millisecond)

err = ctrl.Update(Arguments{
Targets: []discovery.Target{},
ForwardTo: []loki.LogsReceiver{ch1},
})
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

err = ctrl.Update(Arguments{
Targets: []discovery.Target{},
ForwardTo: []loki.LogsReceiver{ch1},
})
require.NoError(t, err)

cancelWorkers()
wg.Wait()
}

func TestFileWatch(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
Expand Down