Skip to content

Commit f961e03

Browse files
authored
Ensure rich logging in priority_task_scheduler (#11295)
Currently when filtering by `execution_id`, then additionally filtering by `executor_id` (or `executor_host_id`), some of the execution logs disappear, because in some cases we aren't consistently using an enriched ctx. This PR should fix that.
1 parent 21f190a commit f961e03

File tree

2 files changed

+40
-33
lines changed

2 files changed

+40
-33
lines changed

enterprise/server/scheduling/priority_task_scheduler/priority_task_scheduler.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ func newTaskQueue(clock clockwork.Clock) *taskQueue {
147147
}
148148
}
149149

150-
func (t *taskQueue) GetAll() []*scpb.EnqueueTaskReservationRequest {
150+
func (t *taskQueue) GetAll(ctx context.Context) []*scpb.EnqueueTaskReservationRequest {
151151
reservations := make([]*scpb.EnqueueTaskReservationRequest, 0, len(t.taskIDs))
152152

153153
for e := t.pqs.Front(); e != nil; e = e.Next() {
154154
pq, ok := e.Value.(*groupPriorityQueue)
155155
if !ok {
156-
log.Error("not a *groupPriorityQueue!??!")
156+
log.CtxError(ctx, "not a *groupPriorityQueue!??!")
157157
continue
158158
}
159159
for _, t := range pq.GetAll() {
@@ -167,7 +167,7 @@ func (t *taskQueue) GetAll() []*scpb.EnqueueTaskReservationRequest {
167167
// Enqueue enqueues a task reservation request into the task queue.
168168
// If the task is already enqueued, it will not be enqueued again.
169169
// Returns true if the task was enqueued, false if it was already enqueued.
170-
func (t *taskQueue) Enqueue(req *scpb.EnqueueTaskReservationRequest) (ok bool) {
170+
func (t *taskQueue) Enqueue(ctx context.Context, req *scpb.EnqueueTaskReservationRequest) (ok bool) {
171171
enqueuedAt := t.clock.Now()
172172

173173
// Don't enqueue the same task twice to avoid inflating queue length
@@ -183,7 +183,7 @@ func (t *taskQueue) Enqueue(req *scpb.EnqueueTaskReservationRequest) (ok bool) {
183183
pq, ok = el.Value.(*groupPriorityQueue)
184184
if !ok {
185185
// Why would this ever happen?
186-
log.Error("not a *groupPriorityQueue!??!")
186+
log.CtxError(ctx, "not a *groupPriorityQueue!??!")
187187
return false
188188
}
189189
} else {
@@ -283,14 +283,14 @@ func (t *taskQueue) DequeueAt(pos *queuePosition) *queuedTask {
283283
return req
284284
}
285285

286-
func (t *taskQueue) Peek() *queuedTask {
286+
func (t *taskQueue) Peek(ctx context.Context) *queuedTask {
287287
if t.currentPQ == nil {
288288
return nil
289289
}
290290
pq, ok := t.currentPQ.Value.(*groupPriorityQueue)
291291
if !ok {
292292
// Why would this ever happen?
293-
log.Error("not a *groupPriorityQueue!??!")
293+
log.CtxError(ctx, "not a *groupPriorityQueue!??!")
294294
return nil
295295
}
296296
v, _ := pq.Peek()
@@ -492,6 +492,7 @@ func (q *PriorityTaskScheduler) Shutdown(ctx context.Context) error {
492492
}
493493

494494
func (q *PriorityTaskScheduler) EnqueueTaskReservation(ctx context.Context, req *scpb.EnqueueTaskReservationRequest) (*scpb.EnqueueTaskReservationResponse, error) {
495+
ctx = q.enrichContext(ctx)
495496
ctx = log.EnrichContext(ctx, log.ExecutionIDKey, req.GetTaskId())
496497

497498
q.mu.Lock()
@@ -524,7 +525,7 @@ func (q *PriorityTaskScheduler) EnqueueTaskReservation(ctx context.Context, req
524525

525526
enqueueFn := func() {
526527
q.mu.Lock()
527-
ok := q.q.Enqueue(req)
528+
ok := q.q.Enqueue(ctx, req)
528529
q.mu.Unlock()
529530
if !ok {
530531
// Already enqueued. This normally shouldn't happen since we checked
@@ -561,6 +562,7 @@ func (q *PriorityTaskScheduler) remove(taskID string) bool {
561562
}
562563

563564
func (q *PriorityTaskScheduler) CancelTaskReservation(ctx context.Context, taskID string) {
565+
ctx = q.enrichContext(ctx)
564566
ctx = log.EnrichContext(ctx, log.ExecutionIDKey, taskID)
565567
if removed := q.remove(taskID); removed {
566568
log.CtxInfof(ctx, "Removed completed task from queue")
@@ -602,6 +604,7 @@ func (q *PriorityTaskScheduler) publishOperation(ctx context.Context, executionI
602604
}
603605

604606
func (q *PriorityTaskScheduler) runTask(ctx context.Context, st *repb.ScheduledTask) (retry bool, err error) {
607+
ctx = q.enrichContext(ctx)
605608
if q.env.GetRemoteExecutionClient() == nil {
606609
return false, status.FailedPreconditionError("Execution client not configured")
607610
}
@@ -748,13 +751,13 @@ func (q *PriorityTaskScheduler) canFitTask(res *queuedTask, reservedResources *r
748751
return true
749752
}
750753

751-
func (q *PriorityTaskScheduler) nextTaskForPruning() *queuedTask {
754+
func (q *PriorityTaskScheduler) nextTaskForPruning(ctx context.Context) *queuedTask {
752755
q.mu.Lock()
753756
defer q.mu.Unlock()
754757
if q.shuttingDown || q.q.Len() == 0 {
755758
return nil
756759
}
757-
return q.q.Peek()
760+
return q.q.Peek(ctx)
758761
}
759762

760763
// This function peeks at the front of the queue and checks to see if the task
@@ -772,7 +775,7 @@ func (q *PriorityTaskScheduler) nextTaskForPruning() *queuedTask {
772775
//
773776
// This function returns true if a task was successfully dequeued.
774777
func (q *PriorityTaskScheduler) trimQueue() bool {
775-
nextTask := q.nextTaskForPruning()
778+
nextTask := q.nextTaskForPruning(q.rootContext)
776779
if nextTask == nil {
777780
return false
778781
}
@@ -781,7 +784,7 @@ func (q *PriorityTaskScheduler) trimQueue() bool {
781784
ctx = tracing.ExtractProtoTraceMetadata(ctx, nextTask.GetTraceMetadata())
782785
resp, err := q.env.GetSchedulerClient().TaskExists(ctx, &scpb.TaskExistsRequest{TaskId: nextTask.GetTaskId()})
783786
if err != nil {
784-
log.Infof("Failed to check if task exists: %s", err)
787+
log.CtxInfof(ctx, "Failed to check if task exists: %s", err)
785788
return false
786789
}
787790
if resp.GetExists() {
@@ -793,7 +796,7 @@ func (q *PriorityTaskScheduler) trimQueue() bool {
793796
q.mu.Lock()
794797
defer q.mu.Unlock()
795798

796-
if nextTask != q.q.Peek() {
799+
if nextTask != q.q.Peek(ctx) {
797800
return false
798801
}
799802

@@ -820,14 +823,14 @@ type queuePosition struct {
820823

821824
// getNextSchedulableTask returns the next task that can be scheduled, and a
822825
// pointer to the task in the queue.
823-
func (q *PriorityTaskScheduler) getNextSchedulableTask() (*queuedTask, *queuePosition) {
826+
func (q *PriorityTaskScheduler) getNextSchedulableTask(ctx context.Context) (*queuedTask, *queuePosition) {
824827
// Use custom resource configuration as a flag guard for the backfilling
825828
// logic, since backfilling only helps if custom resources are configured
826829
// anyway.
827830
// TODO: add more tests for the multi-tenant case and turn this on
828831
// unconditionally to simplify logic.
829832
if len(q.resourceCapacity.Custom) == 0 {
830-
nextTask := q.q.Peek()
833+
nextTask := q.q.Peek(ctx)
831834
if nextTask == nil {
832835
return nil, nil
833836
}
@@ -876,7 +879,7 @@ func (q *PriorityTaskScheduler) handleTask() {
876879
if qLen == 0 {
877880
return
878881
}
879-
nextTask, ref := q.getNextSchedulableTask()
882+
nextTask, ref := q.getNextSchedulableTask(q.rootContext)
880883
if nextTask == nil {
881884
return
882885
}
@@ -976,7 +979,7 @@ func (q *PriorityTaskScheduler) Stop() error {
976979
func (q *PriorityTaskScheduler) GetQueuedTaskReservations() []*scpb.EnqueueTaskReservationRequest {
977980
q.mu.Lock()
978981
defer q.mu.Unlock()
979-
return q.q.GetAll()
982+
return q.q.GetAll(q.rootContext)
980983
}
981984

982985
// QueueLength returns the current number of tasks in the queue.

enterprise/server/scheduling/priority_task_scheduler/priority_task_scheduler_test.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,16 @@ func newTaskReservationRequest(taskID, taskGroupID string, priority int32) *scpb
4040
}
4141

4242
func TestTaskQueue_SingleGroup(t *testing.T) {
43+
ctx := t.Context()
4344
q := newTaskQueue(clockwork.NewRealClock())
4445
require.Equal(t, 0, q.Len())
45-
require.Nil(t, q.Peek())
46+
require.Nil(t, q.Peek(ctx))
4647

47-
q.Enqueue(newTaskReservationRequest("1", testGroupID1, 0))
48+
q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, 0))
4849
require.Equal(t, 1, q.Len())
4950

5051
// Peeking should return the reservation but not remove it.
51-
req := q.Peek()
52+
req := q.Peek(ctx)
5253
require.Equal(t, "1", req.GetTaskId())
5354
require.Equal(t, 1, q.Len())
5455

@@ -59,14 +60,14 @@ func TestTaskQueue_SingleGroup(t *testing.T) {
5960

6061
// Queue should be empty.
6162
require.Equal(t, 0, q.Len())
62-
require.Nil(t, q.Peek())
63+
require.Nil(t, q.Peek(ctx))
6364

64-
q.Enqueue(newTaskReservationRequest("2", testGroupID1, 0))
65-
q.Enqueue(newTaskReservationRequest("3", testGroupID1, 0))
66-
q.Enqueue(newTaskReservationRequest("4", testGroupID1, 0))
65+
q.Enqueue(ctx, newTaskReservationRequest("2", testGroupID1, 0))
66+
q.Enqueue(ctx, newTaskReservationRequest("3", testGroupID1, 0))
67+
q.Enqueue(ctx, newTaskReservationRequest("4", testGroupID1, 0))
6768
// Enqueue task "1" last but give it the highest priority so it gets
6869
// dequeued first.
69-
q.Enqueue(newTaskReservationRequest("1", testGroupID1, -1000))
70+
q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, -1000))
7071

7172
require.Equal(t, "1", q.Dequeue().GetTaskId())
7273
require.Equal(t, "2", q.Dequeue().GetTaskId())
@@ -75,19 +76,20 @@ func TestTaskQueue_SingleGroup(t *testing.T) {
7576
}
7677

7778
func TestTaskQueue_MultipleGroups(t *testing.T) {
79+
ctx := t.Context()
7880
q := newTaskQueue(clockwork.NewRealClock())
7981

8082
// First group has 3 task reservations.
81-
q.Enqueue(newTaskReservationRequest("group1Task1", testGroupID1, 0))
82-
q.Enqueue(newTaskReservationRequest("group1Task2", testGroupID1, 0))
83-
q.Enqueue(newTaskReservationRequest("group1Task3", testGroupID1, 0))
83+
q.Enqueue(ctx, newTaskReservationRequest("group1Task1", testGroupID1, 0))
84+
q.Enqueue(ctx, newTaskReservationRequest("group1Task2", testGroupID1, 0))
85+
q.Enqueue(ctx, newTaskReservationRequest("group1Task3", testGroupID1, 0))
8486
// Second group has 1 task reservation.
85-
q.Enqueue(newTaskReservationRequest("group2Task1", testGroupID2, 0))
87+
q.Enqueue(ctx, newTaskReservationRequest("group2Task1", testGroupID2, 0))
8688
// Third group has 2 task reservations.
8789
// group3Task1 is enqueued last, but has higher priority so it should be
8890
// dequeued first.
89-
q.Enqueue(newTaskReservationRequest("group3Task2", testGroupID3, 0))
90-
q.Enqueue(newTaskReservationRequest("group3Task1", testGroupID3, -1000))
91+
q.Enqueue(ctx, newTaskReservationRequest("group3Task2", testGroupID3, 0))
92+
q.Enqueue(ctx, newTaskReservationRequest("group3Task1", testGroupID3, -1000))
9193

9294
require.Equal(t, "group1Task1", q.Dequeue().GetTaskId())
9395
require.Equal(t, "group2Task1", q.Dequeue().GetTaskId())
@@ -99,10 +101,11 @@ func TestTaskQueue_MultipleGroups(t *testing.T) {
99101
}
100102

101103
func TestTaskQueue_DedupesTasks(t *testing.T) {
104+
ctx := t.Context()
102105
q := newTaskQueue(clockwork.NewRealClock())
103106

104-
require.True(t, q.Enqueue(newTaskReservationRequest("1", testGroupID1, 0)))
105-
require.False(t, q.Enqueue(newTaskReservationRequest("1", testGroupID1, 0)))
107+
require.True(t, q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, 0)))
108+
require.False(t, q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, 0)))
106109

107110
require.Equal(t, 1, q.Len())
108111
require.Equal(t, "1", q.Dequeue().GetTaskId())
@@ -407,6 +410,7 @@ func TestLocalEnqueueTimestamp(t *testing.T) {
407410
}
408411

409412
func TestRemoveTaskFromQueue(t *testing.T) {
413+
ctx := t.Context()
410414
// Try a few runs where we enqueue several tasks, cancel a few tasks, then
411415
// after dequeueing the remaining tasks, we should only dequeue the tasks
412416
// that were not cancelled.
@@ -421,7 +425,7 @@ func TestRemoveTaskFromQueue(t *testing.T) {
421425
taskID := fakeTaskID(fmt.Sprintf("%d", taskIDInt))
422426
taskIDs = append(taskIDs, taskID)
423427
groupIDInt := rand.Intn(3)
424-
q.Enqueue(&scpb.EnqueueTaskReservationRequest{
428+
q.Enqueue(ctx, &scpb.EnqueueTaskReservationRequest{
425429
TaskId: taskID,
426430
SchedulingMetadata: &scpb.SchedulingMetadata{
427431
TaskGroupId: fmt.Sprintf("GR%d", groupIDInt),

0 commit comments

Comments
 (0)