Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ func newTaskQueue(clock clockwork.Clock) *taskQueue {
}
}

func (t *taskQueue) GetAll() []*scpb.EnqueueTaskReservationRequest {
func (t *taskQueue) GetAll(ctx context.Context) []*scpb.EnqueueTaskReservationRequest {
reservations := make([]*scpb.EnqueueTaskReservationRequest, 0, len(t.taskIDs))

for e := t.pqs.Front(); e != nil; e = e.Next() {
pq, ok := e.Value.(*groupPriorityQueue)
if !ok {
log.Error("not a *groupPriorityQueue!??!")
log.CtxError(ctx, "not a *groupPriorityQueue!??!")
continue
}
for _, t := range pq.GetAll() {
Expand All @@ -167,7 +167,7 @@ func (t *taskQueue) GetAll() []*scpb.EnqueueTaskReservationRequest {
// Enqueue enqueues a task reservation request into the task queue.
// If the task is already enqueued, it will not be enqueued again.
// Returns true if the task was enqueued, false if it was already enqueued.
func (t *taskQueue) Enqueue(req *scpb.EnqueueTaskReservationRequest) (ok bool) {
func (t *taskQueue) Enqueue(ctx context.Context, req *scpb.EnqueueTaskReservationRequest) (ok bool) {
enqueuedAt := t.clock.Now()

// Don't enqueue the same task twice to avoid inflating queue length
Expand All @@ -183,7 +183,7 @@ func (t *taskQueue) Enqueue(req *scpb.EnqueueTaskReservationRequest) (ok bool) {
pq, ok = el.Value.(*groupPriorityQueue)
if !ok {
// Why would this ever happen?
log.Error("not a *groupPriorityQueue!??!")
log.CtxError(ctx, "not a *groupPriorityQueue!??!")
return false
}
} else {
Expand Down Expand Up @@ -283,14 +283,14 @@ func (t *taskQueue) DequeueAt(pos *queuePosition) *queuedTask {
return req
}

func (t *taskQueue) Peek() *queuedTask {
func (t *taskQueue) Peek(ctx context.Context) *queuedTask {
if t.currentPQ == nil {
return nil
}
pq, ok := t.currentPQ.Value.(*groupPriorityQueue)
if !ok {
// Why would this ever happen?
log.Error("not a *groupPriorityQueue!??!")
log.CtxError(ctx, "not a *groupPriorityQueue!??!")
return nil
}
v, _ := pq.Peek()
Expand Down Expand Up @@ -492,6 +492,7 @@ func (q *PriorityTaskScheduler) Shutdown(ctx context.Context) error {
}

func (q *PriorityTaskScheduler) EnqueueTaskReservation(ctx context.Context, req *scpb.EnqueueTaskReservationRequest) (*scpb.EnqueueTaskReservationResponse, error) {
ctx = q.enrichContext(ctx)
ctx = log.EnrichContext(ctx, log.ExecutionIDKey, req.GetTaskId())

q.mu.Lock()
Expand Down Expand Up @@ -524,7 +525,7 @@ func (q *PriorityTaskScheduler) EnqueueTaskReservation(ctx context.Context, req

enqueueFn := func() {
q.mu.Lock()
ok := q.q.Enqueue(req)
ok := q.q.Enqueue(ctx, req)
q.mu.Unlock()
if !ok {
// Already enqueued. This normally shouldn't happen since we checked
Expand Down Expand Up @@ -561,6 +562,7 @@ func (q *PriorityTaskScheduler) remove(taskID string) bool {
}

func (q *PriorityTaskScheduler) CancelTaskReservation(ctx context.Context, taskID string) {
ctx = q.enrichContext(ctx)
ctx = log.EnrichContext(ctx, log.ExecutionIDKey, taskID)
if removed := q.remove(taskID); removed {
log.CtxInfof(ctx, "Removed completed task from queue")
Expand Down Expand Up @@ -602,6 +604,7 @@ func (q *PriorityTaskScheduler) publishOperation(ctx context.Context, executionI
}

func (q *PriorityTaskScheduler) runTask(ctx context.Context, st *repb.ScheduledTask) (retry bool, err error) {
ctx = q.enrichContext(ctx)
if q.env.GetRemoteExecutionClient() == nil {
return false, status.FailedPreconditionError("Execution client not configured")
}
Expand Down Expand Up @@ -748,13 +751,13 @@ func (q *PriorityTaskScheduler) canFitTask(res *queuedTask, reservedResources *r
return true
}

func (q *PriorityTaskScheduler) nextTaskForPruning() *queuedTask {
func (q *PriorityTaskScheduler) nextTaskForPruning(ctx context.Context) *queuedTask {
q.mu.Lock()
defer q.mu.Unlock()
if q.shuttingDown || q.q.Len() == 0 {
return nil
}
return q.q.Peek()
return q.q.Peek(ctx)
}

// This function peeks at the front of the queue and checks to see if the task
Expand All @@ -772,7 +775,7 @@ func (q *PriorityTaskScheduler) nextTaskForPruning() *queuedTask {
//
// This function returns true if a task was successfully dequeued.
func (q *PriorityTaskScheduler) trimQueue() bool {
nextTask := q.nextTaskForPruning()
nextTask := q.nextTaskForPruning(q.rootContext)
if nextTask == nil {
return false
}
Expand All @@ -781,7 +784,7 @@ func (q *PriorityTaskScheduler) trimQueue() bool {
ctx = tracing.ExtractProtoTraceMetadata(ctx, nextTask.GetTraceMetadata())
resp, err := q.env.GetSchedulerClient().TaskExists(ctx, &scpb.TaskExistsRequest{TaskId: nextTask.GetTaskId()})
if err != nil {
log.Infof("Failed to check if task exists: %s", err)
log.CtxInfof(ctx, "Failed to check if task exists: %s", err)
return false
}
if resp.GetExists() {
Expand All @@ -793,7 +796,7 @@ func (q *PriorityTaskScheduler) trimQueue() bool {
q.mu.Lock()
defer q.mu.Unlock()

if nextTask != q.q.Peek() {
if nextTask != q.q.Peek(ctx) {
return false
}

Expand All @@ -820,14 +823,14 @@ type queuePosition struct {

// getNextSchedulableTask returns the next task that can be scheduled, and a
// pointer to the task in the queue.
func (q *PriorityTaskScheduler) getNextSchedulableTask() (*queuedTask, *queuePosition) {
func (q *PriorityTaskScheduler) getNextSchedulableTask(ctx context.Context) (*queuedTask, *queuePosition) {
// Use custom resource configuration as a flag guard for the backfilling
// logic, since backfilling only helps if custom resources are configured
// anyway.
// TODO: add more tests for the multi-tenant case and turn this on
// unconditionally to simplify logic.
if len(q.resourceCapacity.Custom) == 0 {
nextTask := q.q.Peek()
nextTask := q.q.Peek(ctx)
if nextTask == nil {
return nil, nil
}
Expand Down Expand Up @@ -876,7 +879,7 @@ func (q *PriorityTaskScheduler) handleTask() {
if qLen == 0 {
return
}
nextTask, ref := q.getNextSchedulableTask()
nextTask, ref := q.getNextSchedulableTask(q.rootContext)
if nextTask == nil {
return
}
Expand Down Expand Up @@ -976,7 +979,7 @@ func (q *PriorityTaskScheduler) Stop() error {
func (q *PriorityTaskScheduler) GetQueuedTaskReservations() []*scpb.EnqueueTaskReservationRequest {
q.mu.Lock()
defer q.mu.Unlock()
return q.q.GetAll()
return q.q.GetAll(q.rootContext)
}

// QueueLength returns the current number of tasks in the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ func newTaskReservationRequest(taskID, taskGroupID string, priority int32) *scpb
}

func TestTaskQueue_SingleGroup(t *testing.T) {
ctx := t.Context()
q := newTaskQueue(clockwork.NewRealClock())
require.Equal(t, 0, q.Len())
require.Nil(t, q.Peek())
require.Nil(t, q.Peek(ctx))

q.Enqueue(newTaskReservationRequest("1", testGroupID1, 0))
q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, 0))
require.Equal(t, 1, q.Len())

// Peeking should return the reservation but not remove it.
req := q.Peek()
req := q.Peek(ctx)
require.Equal(t, "1", req.GetTaskId())
require.Equal(t, 1, q.Len())

Expand All @@ -59,14 +60,14 @@ func TestTaskQueue_SingleGroup(t *testing.T) {

// Queue should be empty.
require.Equal(t, 0, q.Len())
require.Nil(t, q.Peek())
require.Nil(t, q.Peek(ctx))

q.Enqueue(newTaskReservationRequest("2", testGroupID1, 0))
q.Enqueue(newTaskReservationRequest("3", testGroupID1, 0))
q.Enqueue(newTaskReservationRequest("4", testGroupID1, 0))
q.Enqueue(ctx, newTaskReservationRequest("2", testGroupID1, 0))
q.Enqueue(ctx, newTaskReservationRequest("3", testGroupID1, 0))
q.Enqueue(ctx, newTaskReservationRequest("4", testGroupID1, 0))
// Enqueue task "1" last but give it the highest priority so it gets
// dequeued first.
q.Enqueue(newTaskReservationRequest("1", testGroupID1, -1000))
q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, -1000))

require.Equal(t, "1", q.Dequeue().GetTaskId())
require.Equal(t, "2", q.Dequeue().GetTaskId())
Expand All @@ -75,19 +76,20 @@ func TestTaskQueue_SingleGroup(t *testing.T) {
}

func TestTaskQueue_MultipleGroups(t *testing.T) {
ctx := t.Context()
q := newTaskQueue(clockwork.NewRealClock())

// First group has 3 task reservations.
q.Enqueue(newTaskReservationRequest("group1Task1", testGroupID1, 0))
q.Enqueue(newTaskReservationRequest("group1Task2", testGroupID1, 0))
q.Enqueue(newTaskReservationRequest("group1Task3", testGroupID1, 0))
q.Enqueue(ctx, newTaskReservationRequest("group1Task1", testGroupID1, 0))
q.Enqueue(ctx, newTaskReservationRequest("group1Task2", testGroupID1, 0))
q.Enqueue(ctx, newTaskReservationRequest("group1Task3", testGroupID1, 0))
// Second group has 1 task reservation.
q.Enqueue(newTaskReservationRequest("group2Task1", testGroupID2, 0))
q.Enqueue(ctx, newTaskReservationRequest("group2Task1", testGroupID2, 0))
// Third group has 2 task reservations.
// group3Task1 is enqueued last, but has higher priority so it should be
// dequeued first.
q.Enqueue(newTaskReservationRequest("group3Task2", testGroupID3, 0))
q.Enqueue(newTaskReservationRequest("group3Task1", testGroupID3, -1000))
q.Enqueue(ctx, newTaskReservationRequest("group3Task2", testGroupID3, 0))
q.Enqueue(ctx, newTaskReservationRequest("group3Task1", testGroupID3, -1000))

require.Equal(t, "group1Task1", q.Dequeue().GetTaskId())
require.Equal(t, "group2Task1", q.Dequeue().GetTaskId())
Expand All @@ -99,10 +101,11 @@ func TestTaskQueue_MultipleGroups(t *testing.T) {
}

func TestTaskQueue_DedupesTasks(t *testing.T) {
ctx := t.Context()
q := newTaskQueue(clockwork.NewRealClock())

require.True(t, q.Enqueue(newTaskReservationRequest("1", testGroupID1, 0)))
require.False(t, q.Enqueue(newTaskReservationRequest("1", testGroupID1, 0)))
require.True(t, q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, 0)))
require.False(t, q.Enqueue(ctx, newTaskReservationRequest("1", testGroupID1, 0)))

require.Equal(t, 1, q.Len())
require.Equal(t, "1", q.Dequeue().GetTaskId())
Expand Down Expand Up @@ -407,6 +410,7 @@ func TestLocalEnqueueTimestamp(t *testing.T) {
}

func TestRemoveTaskFromQueue(t *testing.T) {
ctx := t.Context()
// Try a few runs where we enqueue several tasks, cancel a few tasks, then
// after dequeueing the remaining tasks, we should only dequeue the tasks
// that were not cancelled.
Expand All @@ -421,7 +425,7 @@ func TestRemoveTaskFromQueue(t *testing.T) {
taskID := fakeTaskID(fmt.Sprintf("%d", taskIDInt))
taskIDs = append(taskIDs, taskID)
groupIDInt := rand.Intn(3)
q.Enqueue(&scpb.EnqueueTaskReservationRequest{
q.Enqueue(ctx, &scpb.EnqueueTaskReservationRequest{
TaskId: taskID,
SchedulingMetadata: &scpb.SchedulingMetadata{
TaskGroupId: fmt.Sprintf("GR%d", groupIDInt),
Expand Down
Loading