Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions internal/runtime/internal/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,27 @@ func NewScheduler(logger log.Logger, taskShutdownDeadline time.Duration) *Schedu
}
}

// Synchronize synchronizes the running components to those defined by rr.
// Synchronize adjusts the set of running components based on the provided
// RunnableNodes in the following way,
//
// New RunnableNodes will be launched as new goroutines. RunnableNodes already
// managed by Scheduler will be kept running, while running RunnableNodes that
// are not in rr will be shut down and removed.
// 1. Nodes already managed by the scheduler will be unchanged.
// 2. Nodes which are no longer present will be told to shutdown.
// 3. Nodes will be given 1 minute to shutdown before new nodes are created.
// 4. Wait for any remaining nodes to shutdown
//
// Nodes are shutdown first to ensure any shared resources, such as ports,
// are allowed to be freed before new nodes are scheduled. As a means to avoid,
// long stretches of downtime we give this a 1 minute timeout.
//
// Existing components will be restarted if they stopped since the previous
// call to Synchronize.
//
// Synchronize is not goroutine safe and should not be called concurrently.
func (s *Scheduler) Synchronize(rr []RunnableNode) error {
s.tasksMut.Lock()

if s.ctx.Err() != nil {
return fmt.Errorf("Scheduler is closed")
select {
case <-s.ctx.Done():
return fmt.Errorf("scheduler is closed")
default:
}

newRunnables := make(map[string]RunnableNode, len(rr))
Expand All @@ -72,6 +80,7 @@ func (s *Scheduler) Synchronize(rr []RunnableNode) error {
}

// Stop tasks that are not defined in rr.
s.tasksMut.Lock()
var stopping sync.WaitGroup
for id, t := range s.tasks {
if _, keep := newRunnables[id]; keep {
Expand All @@ -84,8 +93,26 @@ func (s *Scheduler) Synchronize(rr []RunnableNode) error {
t.Stop()
}(t)
}
s.tasksMut.Unlock()

// Wrapping the waitgroup with a channel will allow us to wait with a timeout
doneStopping := make(chan struct{})
go func() {
stopping.Wait()
close(doneStopping)
}()

stoppingTimedOut := false
select {
case <-doneStopping:
// All tasks stopped successfully within timeout.
case <-time.After(TaskShutdownWarningTimeout):
level.Warn(s.logger).Log("msg", "Some tasks are taking longer than expected to shutdown, proceeding with new tasks")
stoppingTimedOut = true
}

// Launch new runnables that have appeared.
s.tasksMut.Lock()
for id, r := range newRunnables {
if _, exist := s.tasks[id]; exist {
continue
Expand Down Expand Up @@ -119,11 +146,15 @@ func (s *Scheduler) Synchronize(rr []RunnableNode) error {
s.running.Add(1)
s.tasks[nodeID] = newTask(opts)
}

// Unlock the tasks mutex so that Stop calls can complete.
s.tasksMut.Unlock()
// Wait for all stopping runnables to exit.
stopping.Wait()

// If we timed out, wait for stopping tasks to fully exit before returning.
// Tasks shutting down cannot fully complete their shutdown until the taskMut
// lock is released.
if stoppingTimedOut {
<-doneStopping
}

return nil
}

Expand Down
227 changes: 171 additions & 56 deletions internal/runtime/internal/controller/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"os"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime/internal/controller"
Expand Down Expand Up @@ -68,29 +70,175 @@ func TestScheduler_Synchronize(t *testing.T) {
require.NoError(t, sched.Close())
})

t.Run("Removes stale jobs", func(t *testing.T) {
t.Run("Runnables which no longer exist are shutdown before new ones are created", func(t *testing.T) {
var started, finished sync.WaitGroup
started.Add(1)
finished.Add(1)
started.Add(2)

runFunc := func(ctx context.Context) error {
var lock sync.Mutex

basicRun := func(ctx context.Context) error {
defer finished.Done()
started.Done()
<-ctx.Done()
return nil
}

sharedResourceRun := func(ctx context.Context) error {
defer finished.Done()
started.Done()

if !lock.TryLock() {
t.Fatal("failed to claim lock - already held by another component")
return nil
}
defer lock.Unlock()
<-ctx.Done()
return nil
}

sched := controller.NewScheduler(logger, 1*time.Minute)

sched.Synchronize([]controller.RunnableNode{
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
})
started.Wait()
comp1 := fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: sharedResourceRun}}
comp2 := fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: basicRun}}
comp3 := fakeRunnable{ID: "component-c", Component: mockComponent{RunFunc: sharedResourceRun}}

sched.Synchronize([]controller.RunnableNode{})
sched.Synchronize([]controller.RunnableNode{comp1, comp2})
started.Wait()

started.Add(1)
finished.Add(1)
sched.Synchronize([]controller.RunnableNode{comp2, comp3})
started.Wait()
finished.Wait()

finished.Add(2)
require.NoError(t, sched.Close())
finished.Wait()
})

t.Run("Shutdown will stop waiting after TaskShutdownWarningTimeout to startup components and wait for shutdown after", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var oldTaskExited, newTaskStarted atomic.Bool

// Old task that takes a long time to stop
slowStop := func(ctx context.Context) error {
<-ctx.Done()

// Simulate slow shutdown
time.Sleep(2 * controller.TaskShutdownWarningTimeout)
oldTaskExited.Store(true)
return nil
}

// New task
basicRun := func(ctx context.Context) error {
newTaskStarted.Store(true)
<-ctx.Done()
return nil
}

sched := controller.NewScheduler(logger, 5*time.Minute)

// Start component-a with slow stop behavior
comp1 := fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: slowStop}}
err := sched.Synchronize([]controller.RunnableNode{comp1})
require.NoError(t, err)

// Replace with component-b
// This should timeout waiting for component-a, start component-b anyway,
// but not return until component-a fully exits
comp2 := fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: basicRun}}

syncDone := make(chan struct{})
go func() {
err := sched.Synchronize([]controller.RunnableNode{comp2})
require.NoError(t, err)
close(syncDone)
}()

// Wait past the timeout for new task to start
time.Sleep(controller.TaskShutdownWarningTimeout + 1*time.Second)

require.True(t, newTaskStarted.Load(), "new task should have started after timeout")
require.False(t, oldTaskExited.Load(), "old task should still be running")

select {
case <-syncDone:
t.Error("Synchronize returned before old task finished")
default:
}

// Wait for old task to finish
time.Sleep(2 * time.Minute)

select {
case <-syncDone:
default:
t.Error("Synchronize should have returned after old task finished")
}

require.True(t, oldTaskExited.Load(), "old task should have exited")
require.True(t, newTaskStarted.Load(), "new task should still be running")

require.NoError(t, sched.Close())
})
})
t.Run("Task shutdown deadline logs warnings and errors", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// Create a thread-safe buffer to capture log output
var logBuffer syncBuffer
logger := log.NewLogfmtLogger(&logBuffer)

runFunc := func(ctx context.Context) error {
<-ctx.Done()
// Block indefinitely, ignoring context cancellation
time.Sleep(3 * time.Minute)
return nil
}

sched := controller.NewScheduler(logger, 2*time.Minute)

// Start a component
err := sched.Synchronize([]controller.RunnableNode{
fakeRunnable{ID: "blocking-component", Component: mockComponent{RunFunc: runFunc}},
})
require.NoError(t, err)

syncDone := make(chan struct{})
go func() {
err := sched.Synchronize([]controller.RunnableNode{})
require.NoError(t, err)
close(syncDone)
}()

time.Sleep(controller.TaskShutdownWarningTimeout + 1*time.Second)

// Should have warning message
logOutput := logBuffer.String()
require.Contains(t, logOutput, "task shutdown is taking longer than expected")
require.Contains(t, logOutput, "level=warn")

// Wait past the shutdown deadline
time.Sleep(2*time.Minute + 1*time.Second)

// Should have error message
logOutput = logBuffer.String()
require.Contains(t, logOutput, "task shutdown deadline exceeded")
require.Contains(t, logOutput, "level=error")

// Synchronize should have returned
select {
case <-syncDone:
// Good
default:
t.Error("Synchronize should have returned after deadline")
}

require.NoError(t, sched.Close())

// Sleep long enough to let the runFunc exit to preventing a synctest panic
time.Sleep(time.Minute)
})
})
}

Expand All @@ -117,53 +265,20 @@ var _ component.Component = (*mockComponent)(nil)
func (mc mockComponent) Run(ctx context.Context) error { return mc.RunFunc(ctx) }
func (mc mockComponent) Update(newConfig component.Arguments) error { return mc.UpdateFunc(newConfig) }

func TestScheduler_TaskTimeoutLogging(t *testing.T) {
// Temporarily modify timeout values for testing
originalWarningTimeout := controller.TaskShutdownWarningTimeout
controller.TaskShutdownWarningTimeout = 50 * time.Millisecond
defer func() {
controller.TaskShutdownWarningTimeout = originalWarningTimeout
}()

// Create a buffer to capture log output
var logBuffer bytes.Buffer
logger := log.NewLogfmtLogger(&logBuffer)

var started sync.WaitGroup
started.Add(1)

// Create a component that will block and not respond to context cancellation
runFunc := func(ctx context.Context) error {
started.Done()
// Block indefinitely, ignoring context cancellation
// Use a long sleep to simulate a component that doesn't respond to cancellation
time.Sleep(1 * time.Second)
return nil
}

sched := controller.NewScheduler(logger, 150*time.Millisecond)

// Start a component
err := sched.Synchronize([]controller.RunnableNode{
fakeRunnable{ID: "blocking-component", Component: mockComponent{RunFunc: runFunc}},
})
require.NoError(t, err)
started.Wait()

// Remove the component, which should trigger the timeout behavior. This will block until the component exits.
err = sched.Synchronize([]controller.RunnableNode{})
require.NoError(t, err)

logOutput := logBuffer.String()
t.Logf("actual log output:\n%s", logOutput)

// Should contain warning message
require.Contains(t, logOutput, "task shutdown is taking longer than expected")
require.Contains(t, logOutput, "level=warn")
// syncBuffer wraps bytes.Buffer with mutex for thread-safe reads and writes
type syncBuffer struct {
mu sync.Mutex
buf bytes.Buffer
}

// Should contain error message
require.Contains(t, logOutput, "task shutdown deadline exceeded")
require.Contains(t, logOutput, "level=error")
func (sb *syncBuffer) Write(p []byte) (n int, err error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Write(p)
}

require.NoError(t, sched.Close())
func (sb *syncBuffer) String() string {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.String()
}
Loading