Skip to content

Commit e955cde

Browse files
author
Cyril de Catheu
committed
make it possible to disable task clean up
1 parent e9d094c commit e955cde

File tree

3 files changed

+32
-16
lines changed

3 files changed

+32
-16
lines changed

thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/SchedulerService.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,14 @@ public void start() throws Exception {
122122

123123
// schedule task maintenance operations
124124
final TaskCleanUpConfiguration taskCleanUpConfiguration = config.getTaskCleanUpConfiguration();
125-
oldTasksExecutorService.scheduleWithFixedDelay(this::purgeOldTasks,
126-
1,
127-
taskCleanUpConfiguration.getIntervalInMinutes(),
128-
TimeUnit.MINUTES);
129-
if (taskDriverConfiguration.isRandomWorkerIdEnabled()) {
125+
if (taskCleanUpConfiguration.getIntervalInMinutes() > 0) {
126+
oldTasksExecutorService.scheduleWithFixedDelay(this::purgeOldTasks,
127+
1,
128+
taskCleanUpConfiguration.getIntervalInMinutes(),
129+
TimeUnit.MINUTES);
130+
}
131+
132+
if (taskDriverConfiguration.isRandomWorkerIdEnabled() && taskCleanUpConfiguration.getOrphanIntervalInSeconds() > 0) {
130133
orphanTasksExecutorService.scheduleWithFixedDelay(this::handleOrphanTasks,
131134
0,
132135
taskCleanUpConfiguration.getOrphanIntervalInSeconds(),

thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/TaskCronSchedulerRunnable.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ public TaskCronSchedulerRunnable(
107107
this.groupMatcher = GroupMatcher.jobGroupEquals(taskType.toString());
108108
this.taskManager = taskManager;
109109
this.namespaceConfigurationManager = namespaceConfigurationManager;
110-
this.namespaceToQuotaExceededSupplier = namespaceQuotaCacheDurationSeconds > 0 ? scheduledRefreshSupplier(
111-
this::getNamespaceToQuotaExceededMap, Duration.ofSeconds(namespaceQuotaCacheDurationSeconds)): this::getNamespaceToQuotaExceededMap;
110+
this.namespaceToQuotaExceededSupplier = scheduledRefreshSupplier(
111+
this::getNamespaceToQuotaExceededMap,
112+
Duration.ofSeconds(namespaceQuotaCacheDurationSeconds));
112113
}
113114

114115
@Override
@@ -152,10 +153,13 @@ private void updateSchedules() throws SchedulerException {
152153
entityName, id, taskType);
153154
stopJob(jobKey);
154155
} else if (!isActive(entity)) {
155-
log.info("{} with id {} is deactivated. Stopping the scheduled {} job.", entityName, id, taskType);
156+
log.info("{} with id {} is deactivated. Stopping the scheduled {} job.", entityName, id,
157+
taskType);
156158
stopJob(jobKey);
157-
} else if (cachedNamespaceToQuotaExceeded.getOrDefault(nonNullNamespace(entity.namespace()), false)) {
158-
log.info("workspace {} corresponding to {} with id {} has exceeded monthly quota. Stopping scheduled {} job.",
159+
} else if (cachedNamespaceToQuotaExceeded.getOrDefault(nonNullNamespace(entity.namespace()),
160+
false)) {
161+
log.info(
162+
"workspace {} corresponding to {} with id {} has exceeded monthly quota. Stopping scheduled {} job.",
159163
nonNullNamespace(entity.namespace()), entityName, id, taskType);
160164
stopJob(jobKey);
161165
}
@@ -194,7 +198,9 @@ private Long getMonthlyTasksLimit(final @NonNull NamespaceConfigurationDTO confi
194198
}
195199

196200
private long getTasksCountForNamespace(final String namespace) {
197-
final LocalDateTime startOfMonth = LocalDate.now(ZoneOffset.UTC).withDayOfMonth(1).atStartOfDay();
201+
final LocalDateTime startOfMonth = LocalDate.now(ZoneOffset.UTC)
202+
.withDayOfMonth(1)
203+
.atStartOfDay();
198204
final Predicate predicate = Predicate.AND(
199205
Predicate.EQ("namespace", namespace),
200206
Predicate.EQ("type", taskType),
@@ -212,7 +218,8 @@ private void schedule(final E entity,
212218

213219
final String entityNamespace = nonNullNamespace(entity.namespace());
214220
if (namespaceToQuotaExceededMap.getOrDefault(entityNamespace, false)) {
215-
log.info("workspace {} corresponding to {} with id {} has exceeded monthly quota. Skipping scheduling {} job.",
221+
log.info(
222+
"workspace {} corresponding to {} with id {} has exceeded monthly quota. Skipping scheduling {} job.",
216223
entityNamespace, entityName, entity.getId(), taskType);
217224
return;
218225
}
@@ -250,7 +257,9 @@ public void startJob(final E config, final JobKey jobKey) throws SchedulerExcept
250257

251258
private void stopJob(final JobKey jobKey) throws SchedulerException {
252259
if (!scheduler.checkExists(jobKey)) {
253-
log.error("Could not find job to delete {}, {} in the job scheduler. This should never happen. Please reach out to StarTree support.", jobKey.getName(), jobKey.getGroup());
260+
log.error(
261+
"Could not find job to delete {}, {} in the job scheduler. This should never happen. Please reach out to StarTree support.",
262+
jobKey.getName(), jobKey.getGroup());
254263
}
255264
scheduler.deleteJob(jobKey);
256265
log.info("Stopped {} job {}", taskType, jobKey.getName());
@@ -262,7 +271,7 @@ private Trigger buildTrigger(final E config) {
262271
checkArgument(maxTriggersPerMinute <= cronMaxTriggersPerMinute,
263272
"Attempting to schedule a %s job for %s %s that can trigger up to %s times per minute. The limit is %s. Please update the cron %s",
264273
taskType,
265-
entityName,
274+
entityName,
266275
config.getId(),
267276
maxTriggersPerMinute, cronMaxTriggersPerMinute, cron
268277
);
@@ -273,7 +282,7 @@ private Trigger buildTrigger(final E config) {
273282
.withSchedule(cronScheduleBuilder)
274283
.build();
275284
}
276-
285+
277286
private boolean isActive(final E entity) {
278287
return isActiveGetter.isActive(entity);
279288
}
@@ -285,12 +294,14 @@ private boolean isActive(final E entity) {
285294
private String cronOf(final E entity) {
286295
return cronGetter.getCron(entity);
287296
}
288-
297+
289298
public interface CronGetter<E extends AbstractDTO> {
299+
290300
String getCron(final E entity);
291301
}
292302

293303
public interface isActiveGetter<E extends AbstractDTO> {
304+
294305
boolean isActive(final E entity);
295306
}
296307
}

thirdeye-scheduler/src/main/java/ai/startree/thirdeye/scheduler/taskcleanup/TaskCleanUpConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
public class TaskCleanUpConfiguration {
1919

20+
// 0 can be used as a special value to disable old tasks clean up
2021
private Integer intervalInMinutes = 5;
2122
private Integer retentionInDays = 30;
2223
private Integer maxEntriesToDelete = TASK_MAX_DELETES_PER_CLEANUP;
24+
// 0 can be used a special value to disable orphan tasks clean up
2325
private Integer orphanIntervalInSeconds = 30;
2426

2527
public Integer getIntervalInMinutes() {

0 commit comments

Comments
 (0)