Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ taskDriver:
# must be a non-negative integer unique per instance/worker
# must be omitted when `randomWorkerIdEnabled` is set `true`
id: 0
# whether to use the new task acquisition logic (beta) or the old one (slow)
newAcquisitionLogic: false
# deprecated enabled by default - kept to ensure it's not breaking config parsing - can be removed July 2025
newAcquisitionLogic: true

rca:
# default values - structure example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ data:
noTaskDelay: {{ .Values.worker.config.noTaskDelay | default 15 }}
heartbeatInterval: {{ .Values.worker.config.heartbeatInterval | default 30 }}
activeThresholdMultiplier: {{ .Values.worker.config.activeThresholdMultiplier | default 30 }}
newAcquisitionLogic: {{ .Values.worker.config.newAcquisitionLogic | default false }}

defaultWorkspaceConfiguration:
timeConfiguration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ data:
{{- end }}
heartbeatInterval: {{ .Values.worker.config.heartbeatInterval | default 30 }}
activeThresholdMultiplier: {{ .Values.worker.config.activeThresholdMultiplier | default 30 }}
newAcquisitionLogic: {{ .Values.worker.config.newAcquisitionLogic | default false }}

defaultWorkspaceConfiguration:
timeConfiguration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ data:
noTaskDelay: {{ .Values.worker.config.noTaskDelay | default 15 }}
heartbeatInterval: {{ .Values.worker.config.heartbeatInterval | default 30 }}
activeThresholdMultiplier: {{ .Values.worker.config.activeThresholdMultiplier | default 30 }}
newAcquisitionLogic: {{ .Values.worker.config.newAcquisitionLogic | default false }}

defaultWorkspaceConfiguration:
timeConfiguration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ taskDriver:
noTaskDelay: 1
randomDelayCap: 1
taskFailureDelay: 1
newAcquisitionLogic: true

scheduler:
# Run the Quartz Scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,3 @@ taskDriver:
noTaskDelay: 1
randomDelayCap: 1
taskFailureDelay: 1
newAcquisitionLogic: true
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ taskDriver:
noTaskDelay: 1
randomDelayCap: 1
taskFailureDelay: 1
newAcquisitionLogic: true

scheduler:
# Run the Quartz Scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ taskDriver:
noTaskDelay: 1
randomDelayCap: 1
taskFailureDelay: 1
newAcquisitionLogic: true
maxParallelTasks: 1

scheduler:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package ai.startree.thirdeye.worker.task;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.time.Duration;

public class TaskDriverConfiguration {
Expand All @@ -31,8 +32,8 @@ public class TaskDriverConfiguration {
private int taskFetchSizeCap = 50;
private int maxParallelTasks = 5;

// still experimental -- TODO CYRIL - set to true once validated and stable - then remove
private boolean newAcquisitionLogic = false;
@JsonIgnore // not used anymore - newAcquisitionLogic is enabled by default now - can be removed July 2025
private boolean newAcquisitionLogic = true;

public Long getId() {
return id;
Expand Down Expand Up @@ -133,10 +134,12 @@ public TaskDriverConfiguration setActiveThresholdMultiplier(final int activeThre
return this;
}

@Deprecated(forRemoval = true) // can be removed July 2025
public boolean isNewAcquisitionLogic() {
return newAcquisitionLogic;
}

@Deprecated(forRemoval = true) // can be removed July 2025
public TaskDriverConfiguration setNewAcquisitionLogic(
final boolean newAcquisitionLogic) {
this.newAcquisitionLogic = newAcquisitionLogic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import ai.startree.thirdeye.spi.task.TaskStatus;
import ai.startree.thirdeye.spi.task.TaskType;
import com.google.inject.Singleton;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Timer.Sample;
Expand Down Expand Up @@ -52,7 +51,6 @@ public class TaskDriverRunnable implements Runnable {
private final Timer taskRunTimerOfSuccess;
private final Timer taskRunTimerOfException;
private final Timer taskWaitTimer;
private final Counter taskConcurrentAcquisition;

public TaskDriverRunnable(final TaskContext taskContext) {
this.taskContext = taskContext;
Expand All @@ -79,15 +77,12 @@ public TaskDriverRunnable(final TaskContext taskContext) {
.description(
"Start: a task is created in the persistence layer. End: the task is picked by a task runner for execution.")
.register(Metrics.globalRegistry);
taskConcurrentAcquisition = Counter.builder("thirdeye_task_concurrent_acquisition")
.description("Count the number of time a worker fails to take a lock on a task because the task is locked by another process (most likely another worker).")
.register(Metrics.globalRegistry);
}

public void run() {
while (!isShutdown()) {
// select a task to execute, and update it to RUNNING
final TaskDTO taskDTO = config.isNewAcquisitionLogic() ? waitForTask() : waitForTaskLegacy();
final TaskDTO taskDTO = waitForTask();
if (taskDTO == null) {
continue;
}
Expand Down Expand Up @@ -160,52 +155,6 @@ private Future<List<TaskResult>> runTaskAsync(final TaskDTO taskDTO) throws IOEx
.submit(() -> taskRunner.execute(taskInfo, taskContext, taskDTO.namespace()));
}

/**
* Returns a TaskDTO if a task is successfully acquired; returns null if system is shutting down.
*
* @return null if system is shutting down.
*/
@Deprecated
private TaskDTO waitForTaskLegacy() {
while (!isShutdown()) {
final TaskDTO nextTask;
try {
nextTask = taskManager.findNextTaskToRun();
} catch (Exception e) {
LOG.error("Failed to fetch a new task to run", e);
idleTimer().record(() -> sleep(true));
continue;
}
if (nextTask == null) {
// no task found
idleTimer().record(() -> sleep(false));
continue;
}
if (isShutdown()) {
break;
}
try {
boolean success = taskManager.acquireTaskToRun(nextTask, workerId);
if (success) {
final long waitTime = System.currentTimeMillis() - nextTask.getCreateTime().getTime();
taskWaitTimer.record(waitTime, TimeUnit.MILLISECONDS);
return nextTask;
} else {
taskConcurrentAcquisition.increment();
LOG.debug("Failed to acquire task {} referencing {} from worker id {}. Task was locked, or edited by another transaction.)", nextTask.getId(),
nextTask.getRefId(), workerId);
// don't sleep - look for a next task
continue;
}
} catch (Exception e) {
LOG.warn("Failed to acquire task {} from worker id {})", nextTask, workerId, e);
idleTimer().record(() -> sleep(true));
continue;
}
}
return null;
}

/**
* Returns a TaskDTO if a task is successfully acquired; returns null if system is shutting down.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ private static String toJson(final Object object) {
public void setUp() {
config = new TaskDriverConfiguration()
.setRandomWorkerIdEnabled(true)
.setHeartbeatInterval(HEARTBEAT_INTERVAL)
.setNewAcquisitionLogic(true);
.setHeartbeatInterval(HEARTBEAT_INTERVAL);

taskManager = Mockito.mock(TaskManager.class);

Expand Down
Loading