Skip to content

Commit 287ff18

Browse files
committed
feat: separate worker disabled state from worker version
1 parent ad106a8 commit 287ff18

11 files changed

Lines changed: 90 additions & 57 deletions

File tree

indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public DateTime getBlacklistedUntil()
211211

212212
public boolean isValidVersion(String minVersion)
213213
{
214-
return worker.getVersion().compareTo(minVersion) >= 0;
214+
return !worker.isDisabled() && worker.getVersion().compareTo(minVersion) >= 0;
215215
}
216216

217217
public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)

indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ public boolean isRunningTask(String taskId)
205205
@UsedInGeneratedCode // See JavaScriptWorkerSelectStrategyTest
206206
public boolean isValidVersion(String minVersion)
207207
{
208-
return worker.get().getVersion().compareTo(minVersion) >= 0;
208+
final Worker w = worker.get();
209+
return !w.isDisabled() && w.getVersion().compareTo(minVersion) >= 0;
209210
}
210211

211212
public void setWorker(Worker newWorker)

indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,11 @@ public class WorkerHolder
6565
public static final TypeReference<ChangeRequestsSnapshot<WorkerHistoryItem>> WORKER_SYNC_RESP_TYPE_REF = new TypeReference<>() {};
6666

6767

68-
private final Worker worker;
69-
private Worker disabledWorker;
68+
/**
69+
* Pre-built views with isDisabled() set to false/true respectively. {@link #getWorker()} selects between them based on {@link #disabled}.
70+
*/
71+
private final Worker enabledWorker;
72+
private final Worker disabledWorker;
7073

7174
protected final AtomicBoolean disabled;
7275
private final AtomicBoolean syncedAtleastOnce = new AtomicBoolean(false);
@@ -100,15 +103,16 @@ public WorkerHolder(
100103
this.httpClient = httpClient;
101104
this.config = config;
102105
this.listener = listener;
103-
this.worker = worker;
104-
//worker holder is created disabled and gets enabled after first sync success.
106+
this.enabledWorker = workerWithDisabledState(worker, false);
107+
this.disabledWorker = workerWithDisabledState(worker, true);
108+
// WorkerHolder starts disabled and gets enabled after first successful sync.
105109
this.disabled = new AtomicBoolean(true);
106110

107111
this.syncer = new ChangeRequestHttpSyncer<>(
108112
smileMapper,
109113
httpClient,
110114
workersSyncExec,
111-
TaskRunnerUtils.makeWorkerURL(worker, "/"),
115+
TaskRunnerUtils.makeWorkerURL(enabledWorker, "/"),
112116
"/druid-internal/v1/worker",
113117
WORKER_SYNC_RESP_TYPE_REF,
114118
config.getSyncRequestTimeout().toStandardDuration().getMillis(),
@@ -125,7 +129,7 @@ public WorkerHolder(
125129

126130
public Worker getWorker()
127131
{
128-
return worker;
132+
return disabled.get() ? disabledWorker : enabledWorker;
129133
}
130134

131135
public DateTime getBlacklistedUntil()
@@ -145,23 +149,8 @@ public void setBlacklistedUntil(DateTime blacklistedUntil)
145149

146150
public ImmutableWorkerInfo toImmutable()
147151
{
148-
Worker w = worker;
149-
if (disabled.get()) {
150-
if (disabledWorker == null) {
151-
disabledWorker = new Worker(
152-
worker.getScheme(),
153-
worker.getHost(),
154-
worker.getIp(),
155-
worker.getCapacity(),
156-
"",
157-
worker.getCategory()
158-
);
159-
}
160-
w = disabledWorker;
161-
}
162-
163152
return ImmutableWorkerInfo.fromWorkerAnnouncements(
164-
w,
153+
getWorker(),
165154
tasksSnapshotRef.get(),
166155
lastCompletedTaskTime.get(),
167156
blacklistedUntil.get()
@@ -189,12 +178,12 @@ public boolean assignTask(Task task)
189178
log.info(
190179
"Received task[%s] assignment on worker[%s] when worker is disabled.",
191180
task.getId(),
192-
worker.getHost()
181+
enabledWorker.getHost()
193182
);
194183
return false;
195184
}
196185

197-
URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask");
186+
URL url = TaskRunnerUtils.makeWorkerURL(enabledWorker, "/druid-internal/v1/worker/assignTask");
198187
int numTries = config.getAssignRequestMaxRetries();
199188

200189
try {
@@ -215,7 +204,7 @@ public boolean assignTask(Task task)
215204
throw new RE(
216205
"Failed to assign task[%s] to worker[%s]. Response Code[%s] and Message[%s]. Retrying...",
217206
task.getId(),
218-
worker.getHost(),
207+
enabledWorker.getHost(),
219208
response.getStatus().getCode(),
220209
response.getContent()
221210
);
@@ -226,7 +215,7 @@ public boolean assignTask(Task task)
226215
ex,
227216
"Request to assign task[%s] to worker[%s] failed. Retrying...",
228217
task.getId(),
229-
worker.getHost()
218+
enabledWorker.getHost()
230219
);
231220
}
232221
},
@@ -235,14 +224,14 @@ public boolean assignTask(Task task)
235224
);
236225
}
237226
catch (Exception ex) {
238-
log.info("Not sure whether task[%s] was successfully assigned to worker[%s].", task.getId(), worker.getHost());
227+
log.info("Not sure whether task[%s] was successfully assigned to worker[%s].", task.getId(), enabledWorker.getHost());
239228
return true;
240229
}
241230
}
242231

243232
public void shutdownTask(String taskId)
244233
{
245-
final URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/shutdown", taskId);
234+
final URL url = TaskRunnerUtils.makeWorkerURL(enabledWorker, "/druid/worker/v1/task/%s/shutdown", taskId);
246235

247236
try {
248237
RetryUtils.retry(
@@ -257,17 +246,17 @@ public void shutdownTask(String taskId)
257246
if (response.getStatus().getCode() == 200) {
258247
log.info(
259248
"Sent shutdown message to worker: %s, status %s, response: %s",
260-
worker.getHost(),
249+
enabledWorker.getHost(),
261250
response.getStatus(),
262251
response.getContent()
263252
);
264253
return null;
265254
} else {
266-
throw new RE("Attempt to shutdown task[%s] on worker[%s] failed.", taskId, worker.getHost());
255+
throw new RE("Attempt to shutdown task[%s] on worker[%s] failed.", taskId, enabledWorker.getHost());
267256
}
268257
}
269258
catch (ExecutionException e) {
270-
throw new RE(e, "Error in handling post to [%s] for task [%s]", worker.getHost(), taskId);
259+
throw new RE(e, "Error in handling post to [%s] for task [%s]", enabledWorker.getHost(), taskId);
271260
}
272261
},
273262
e -> !(e instanceof InterruptedException),
@@ -279,7 +268,7 @@ public void shutdownTask(String taskId)
279268
Thread.currentThread().interrupt();
280269
}
281270

282-
log.error("Failed to shutdown task[%s] on worker[%s] failed.", taskId, worker.getHost());
271+
log.error("Failed to shutdown task[%s] on worker[%s] failed.", taskId, enabledWorker.getHost());
283272
}
284273
}
285274

@@ -296,7 +285,7 @@ public void stop()
296285
public void waitForInitialization() throws InterruptedException
297286
{
298287
if (!syncer.awaitInitialization()) {
299-
throw new RE("Failed to sync with worker[%s].", worker.getHost());
288+
throw new RE("Failed to sync with worker[%s].", enabledWorker.getHost());
300289
}
301290
}
302291

@@ -347,7 +336,7 @@ public void fullSync(List<WorkerHistoryItem> changes)
347336
log.makeAlert(
348337
"Got unknown sync update[%s] from worker[%s]. Ignored.",
349338
change.getClass().getName(),
350-
worker.getHost()
339+
enabledWorker.getHost()
351340
).emit();
352341
}
353342
}
@@ -359,7 +348,7 @@ public void fullSync(List<WorkerHistoryItem> changes)
359348
"task[%s] in state[%s] suddenly disappeared on worker[%s]. failing it.",
360349
announcement.getTaskId(),
361350
announcement.getStatus(),
362-
worker.getHost()
351+
enabledWorker.getHost()
363352
);
364353
delta.add(
365354
TaskAnnouncement.create(
@@ -402,7 +391,7 @@ public void deltaSync(List<WorkerHistoryItem> changes)
402391
"task[%s] in state[%s] suddenly disappeared on worker[%s]. failing it.",
403392
announcement.getTaskId(),
404393
announcement.getStatus(),
405-
worker.getHost()
394+
enabledWorker.getHost()
406395
);
407396
delta.add(
408397
TaskAnnouncement.create(
@@ -425,7 +414,7 @@ public void deltaSync(List<WorkerHistoryItem> changes)
425414
log.makeAlert(
426415
"Got unknown sync update[%s] from worker[%s]. Ignored.",
427416
change.getClass().getName(),
428-
worker.getHost()
417+
enabledWorker.getHost()
429418
).emit();
430419
}
431420
}
@@ -444,21 +433,29 @@ private void notifyListener(List<TaskAnnouncement> announcements, boolean isWork
444433
ex,
445434
"Unknown exception while updating task[%s] state from worker[%s].",
446435
announcement.getTaskId(),
447-
worker.getHost()
436+
enabledWorker.getHost()
448437
);
449438
}
450439
}
451440

452441
syncedAtleastOnce.set(true);
453442
if (isWorkerDisabled != disabled.get()) {
454443
disabled.set(isWorkerDisabled);
455-
log.info("Worker[%s] disabled set to [%s].", worker.getHost(), isWorkerDisabled);
444+
log.info("Worker[%s] disabled set to [%s].", enabledWorker.getHost(), isWorkerDisabled);
456445
listener.stateChanged(!isWorkerDisabled, WorkerHolder.this);
457446
}
458447
}
459448
};
460449
}
461450

451+
private static Worker workerWithDisabledState(Worker w, boolean disabled)
452+
{
453+
if (w.isDisabled() == disabled) {
454+
return w;
455+
}
456+
return new Worker(w.getScheme(), w.getHost(), w.getIp(), w.getCapacity(), w.getVersion(), w.getCategory(), disabled);
457+
}
458+
462459
public interface Listener
463460
{
464461
void taskAddedOrUpdated(TaskAnnouncement announcement, WorkerHolder workerHolder);

indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonCreator;
2323
import com.fasterxml.jackson.annotation.JsonProperty;
24+
import org.apache.druid.common.config.Configs;
2425
import org.apache.druid.guice.annotations.PublicApi;
2526

27+
import javax.annotation.Nullable;
28+
2629
/**
2730
* A container for worker metadata.
2831
*/
@@ -35,6 +38,7 @@ public class Worker
3538
private final int capacity;
3639
private final String version;
3740
private final String category;
41+
private final boolean disabled;
3842

3943
@JsonCreator
4044
public Worker(
@@ -43,7 +47,8 @@ public Worker(
4347
@JsonProperty("ip") String ip,
4448
@JsonProperty("capacity") int capacity,
4549
@JsonProperty("version") String version,
46-
@JsonProperty("category") String category
50+
@JsonProperty("category") String category,
51+
@JsonProperty("disabled") @Nullable Boolean disabled
4752
)
4853
{
4954
this.scheme = scheme == null ? "http" : scheme; // needed for backwards compatibility with older workers (pre-#4270)
@@ -52,6 +57,19 @@ public Worker(
5257
this.capacity = capacity;
5358
this.version = version;
5459
this.category = category;
60+
this.disabled = Configs.valueOrDefault(disabled, false);
61+
}
62+
63+
public Worker(
64+
String scheme,
65+
String host,
66+
String ip,
67+
int capacity,
68+
String version,
69+
String category
70+
)
71+
{
72+
this(scheme, host, ip, capacity, version, category, false);
5573
}
5674

5775
@JsonProperty
@@ -90,6 +108,12 @@ public String getCategory()
90108
return category;
91109
}
92110

111+
@JsonProperty
112+
public boolean isDisabled()
113+
{
114+
return disabled;
115+
}
116+
93117
@Override
94118
public boolean equals(Object o)
95119
{
@@ -105,6 +129,9 @@ public boolean equals(Object o)
105129
if (capacity != worker.capacity) {
106130
return false;
107131
}
132+
if (disabled != worker.disabled) {
133+
return false;
134+
}
108135
if (!scheme.equals(worker.scheme)) {
109136
return false;
110137
}
@@ -129,6 +156,7 @@ public int hashCode()
129156
result = 31 * result + capacity;
130157
result = 31 * result + version.hashCode();
131158
result = 31 * result + category.hashCode();
159+
result = 31 * result + (disabled ? 1 : 0);
132160
return result;
133161
}
134162

@@ -142,6 +170,7 @@ public String toString()
142170
", capacity=" + capacity +
143171
", version='" + version + '\'' +
144172
", category='" + category + '\'' +
173+
", disabled=" + disabled +
145174
'}';
146175
}
147176

indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,16 @@ public Response doDisable()
100100
{
101101
try {
102102
if (curatorCoordinator != null) {
103+
// Dual-write disabled signal: legacy version="" for old overlords + disabled=true for new overlords.
104+
// TODO: Safe to drop DISABLED_VERSION once backward compatibility with overlords is no longer required.
103105
final Worker disabledWorker = new Worker(
104106
enabledWorker.getScheme(),
105107
enabledWorker.getHost(),
106108
enabledWorker.getIp(),
107109
enabledWorker.getCapacity(),
108110
DISABLED_VERSION,
109-
enabledWorker.getCategory()
111+
enabledWorker.getCategory(),
112+
true
110113
);
111114
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
112115
}

indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ public void testWorkerDisabled() throws Exception
560560
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
561561

562562
// Confirm RTR thinks the worker is disabled.
563-
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
563+
Assert.assertTrue(Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().isDisabled());
564564
}
565565

566566
@Test

indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,9 @@ void disableWorker(Worker worker) throws Exception
186186
worker.getHost(),
187187
worker.getIp(),
188188
worker.getCapacity(),
189-
"",
190-
worker.getCategory()
189+
worker.getVersion(),
190+
worker.getCategory(),
191+
true
191192
))
192193
);
193194
}

0 commit comments

Comments
 (0)