Skip to content

Commit 5f64bd5

Browse files
author
Cyril de Catheu
authored
[worker] remove legacy task execution framework (#1806)
1 parent fa1e908 commit 5f64bd5

File tree

11 files changed

+9
-65
lines changed

11 files changed

+9
-65
lines changed

config/server.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ taskDriver:
9696
# must be a non-negative integer unique per instance/worker
9797
# must be omitted when `randomWorkerIdEnabled` is set `true`
9898
id: 0
99-
# whether to use the new task acquisition logic (beta) or the old one (slow)
100-
newAcquisitionLogic: false
99+
# deprecated enabled by default - kept to ensure it's not breaking config parsing - can be removed July 2025
100+
newAcquisitionLogic: true
101101

102102
rca:
103103
# default values - structure example

kubernetes/helm/startree-thirdeye/templates/coordinator/configmap.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ data:
9595
noTaskDelay: {{ .Values.worker.config.noTaskDelay | default 15 }}
9696
heartbeatInterval: {{ .Values.worker.config.heartbeatInterval | default 30 }}
9797
activeThresholdMultiplier: {{ .Values.worker.config.activeThresholdMultiplier | default 30 }}
98-
newAcquisitionLogic: {{ .Values.worker.config.newAcquisitionLogic | default false }}
9998

10099
defaultWorkspaceConfiguration:
101100
timeConfiguration:

kubernetes/helm/startree-thirdeye/templates/scheduler/scheduler-config.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ data:
115115
{{- end }}
116116
heartbeatInterval: {{ .Values.worker.config.heartbeatInterval | default 30 }}
117117
activeThresholdMultiplier: {{ .Values.worker.config.activeThresholdMultiplier | default 30 }}
118-
newAcquisitionLogic: {{ .Values.worker.config.newAcquisitionLogic | default false }}
119118
120119
defaultWorkspaceConfiguration:
121120
timeConfiguration:

kubernetes/helm/startree-thirdeye/templates/worker/worker-config.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ data:
119119
noTaskDelay: {{ .Values.worker.config.noTaskDelay | default 15 }}
120120
heartbeatInterval: {{ .Values.worker.config.heartbeatInterval | default 30 }}
121121
activeThresholdMultiplier: {{ .Values.worker.config.activeThresholdMultiplier | default 30 }}
122-
newAcquisitionLogic: {{ .Values.worker.config.newAcquisitionLogic | default false }}
123122
124123
defaultWorkspaceConfiguration:
125124
timeConfiguration:

thirdeye-integration-tests/src/test/resources/anomalyresolution/config/server.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ taskDriver:
6363
noTaskDelay: 1
6464
randomDelayCap: 1
6565
taskFailureDelay: 1
66-
newAcquisitionLogic: true
6766

6867
scheduler:
6968
# Run the Quartz Scheduler.

thirdeye-integration-tests/src/test/resources/happypath/config/server.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,3 @@ taskDriver:
7171
noTaskDelay: 1
7272
randomDelayCap: 1
7373
taskFailureDelay: 1
74-
newAcquisitionLogic: true

thirdeye-integration-tests/src/test/resources/scheduling/config/server.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ taskDriver:
6363
noTaskDelay: 1
6464
randomDelayCap: 1
6565
taskFailureDelay: 1
66-
newAcquisitionLogic: true
6766

6867
scheduler:
6968
# Run the Quartz Scheduler.

thirdeye-integration-tests/src/test/resources/schedulingquota/config/server.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ taskDriver:
6363
noTaskDelay: 1
6464
randomDelayCap: 1
6565
taskFailureDelay: 1
66-
newAcquisitionLogic: true
6766
maxParallelTasks: 1
6867

6968
scheduler:

thirdeye-worker/src/main/java/ai/startree/thirdeye/worker/task/TaskDriverConfiguration.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package ai.startree.thirdeye.worker.task;
1515

16+
import com.fasterxml.jackson.annotation.JsonIgnore;
1617
import java.time.Duration;
1718

1819
public class TaskDriverConfiguration {
@@ -31,8 +32,8 @@ public class TaskDriverConfiguration {
3132
private int taskFetchSizeCap = 50;
3233
private int maxParallelTasks = 5;
3334

34-
// still experimental -- TODO CYRIL - set to true once validated and stable - then remove
35-
private boolean newAcquisitionLogic = false;
35+
@JsonIgnore // not used anymore - newAcquisitionLogic is enabled by default now - can be removed July 2025
36+
private boolean newAcquisitionLogic = true;
3637

3738
public Long getId() {
3839
return id;
@@ -133,10 +134,12 @@ public TaskDriverConfiguration setActiveThresholdMultiplier(final int activeThre
133134
return this;
134135
}
135136

137+
@Deprecated(forRemoval = true) // can be removed July 2025
136138
public boolean isNewAcquisitionLogic() {
137139
return newAcquisitionLogic;
138140
}
139141

142+
@Deprecated(forRemoval = true) // can be removed July 2025
140143
public TaskDriverConfiguration setNewAcquisitionLogic(
141144
final boolean newAcquisitionLogic) {
142145
this.newAcquisitionLogic = newAcquisitionLogic;

thirdeye-worker/src/main/java/ai/startree/thirdeye/worker/task/TaskDriverRunnable.java

Lines changed: 1 addition & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import ai.startree.thirdeye.spi.task.TaskStatus;
2323
import ai.startree.thirdeye.spi.task.TaskType;
2424
import com.google.inject.Singleton;
25-
import io.micrometer.core.instrument.Counter;
2625
import io.micrometer.core.instrument.Metrics;
2726
import io.micrometer.core.instrument.Timer;
2827
import io.micrometer.core.instrument.Timer.Sample;
@@ -52,7 +51,6 @@ public class TaskDriverRunnable implements Runnable {
5251
private final Timer taskRunTimerOfSuccess;
5352
private final Timer taskRunTimerOfException;
5453
private final Timer taskWaitTimer;
55-
private final Counter taskConcurrentAcquisition;
5654

5755
public TaskDriverRunnable(final TaskContext taskContext) {
5856
this.taskContext = taskContext;
@@ -79,15 +77,12 @@ public TaskDriverRunnable(final TaskContext taskContext) {
7977
.description(
8078
"Start: a task is created in the persistence layer. End: the task is picked by a task runner for execution.")
8179
.register(Metrics.globalRegistry);
82-
taskConcurrentAcquisition = Counter.builder("thirdeye_task_concurrent_acquisition")
83-
.description("Count the number of time a worker fails to take a lock on a task because the task is locked by another process (most likely another worker).")
84-
.register(Metrics.globalRegistry);
8580
}
8681

8782
public void run() {
8883
while (!isShutdown()) {
8984
// select a task to execute, and update it to RUNNING
90-
final TaskDTO taskDTO = config.isNewAcquisitionLogic() ? waitForTask() : waitForTaskLegacy();
85+
final TaskDTO taskDTO = waitForTask();
9186
if (taskDTO == null) {
9287
continue;
9388
}
@@ -160,52 +155,6 @@ private Future<List<TaskResult>> runTaskAsync(final TaskDTO taskDTO) throws IOEx
160155
.submit(() -> taskRunner.execute(taskInfo, taskContext, taskDTO.namespace()));
161156
}
162157

163-
/**
164-
* Returns a TaskDTO if a task is successfully acquired; returns null if system is shutting down.
165-
*
166-
* @return null if system is shutting down.
167-
*/
168-
@Deprecated
169-
private TaskDTO waitForTaskLegacy() {
170-
while (!isShutdown()) {
171-
final TaskDTO nextTask;
172-
try {
173-
nextTask = taskManager.findNextTaskToRun();
174-
} catch (Exception e) {
175-
LOG.error("Failed to fetch a new task to run", e);
176-
idleTimer().record(() -> sleep(true));
177-
continue;
178-
}
179-
if (nextTask == null) {
180-
// no task found
181-
idleTimer().record(() -> sleep(false));
182-
continue;
183-
}
184-
if (isShutdown()) {
185-
break;
186-
}
187-
try {
188-
boolean success = taskManager.acquireTaskToRun(nextTask, workerId);
189-
if (success) {
190-
final long waitTime = System.currentTimeMillis() - nextTask.getCreateTime().getTime();
191-
taskWaitTimer.record(waitTime, TimeUnit.MILLISECONDS);
192-
return nextTask;
193-
} else {
194-
taskConcurrentAcquisition.increment();
195-
LOG.debug("Failed to acquire task {} referencing {} from worker id {}. Task was locked, or edited by another transaction.)", nextTask.getId(),
196-
nextTask.getRefId(), workerId);
197-
// don't sleep - look for a next task
198-
continue;
199-
}
200-
} catch (Exception e) {
201-
LOG.warn("Failed to acquire task {} from worker id {})", nextTask, workerId, e);
202-
idleTimer().record(() -> sleep(true));
203-
continue;
204-
}
205-
}
206-
return null;
207-
}
208-
209158
/**
210159
* Returns a TaskDTO if a task is successfully acquired; returns null if system is shutting down.
211160
*

0 commit comments

Comments
 (0)