-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWorkerThread.java
More file actions
71 lines (58 loc) · 2.24 KB
/
WorkerThread.java
File metadata and controls
71 lines (58 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.taskscheduler;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A worker thread that continuously polls the task queue and executes tasks.
* Supports graceful shutdown via an interrupt signal.
*/
public class WorkerThread extends Thread {
private static final AtomicInteger WORKER_COUNTER = new AtomicInteger(0);
private final TaskQueue taskQueue;
private final ThreadPoolStats stats;
private volatile boolean running = true;
private int tasksCompleted = 0;
public WorkerThread(TaskQueue taskQueue, ThreadPoolStats stats) {
super("Worker-" + WORKER_COUNTER.incrementAndGet());
this.taskQueue = taskQueue;
this.stats = stats;
setDaemon(false);
}
@Override
public void run() {
System.out.printf("[%s] Started%n", getName());
while (running || !taskQueue.isEmpty()) {
try {
// Wait up to 500ms for a task before checking running flag
Task task = taskQueue.poll(500, TimeUnit.MILLISECONDS);
if (task != null) {
executeTask(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.printf("[%s] Shutting down. Completed %d tasks.%n", getName(), tasksCompleted);
}
private void executeTask(Task task) {
long waitTime = System.currentTimeMillis() - task.getSubmittedAt();
System.out.printf("[%s] Executing %s (waited %dms)%n", getName(), task, waitTime);
long startTime = System.currentTimeMillis();
try {
task.execute();
long duration = System.currentTimeMillis() - startTime;
tasksCompleted++;
stats.recordCompletion(waitTime, duration);
System.out.printf("[%s] Finished %s in %dms%n", getName(), task, duration);
} catch (Exception e) {
stats.recordFailure();
System.err.printf("[%s] Task %s failed: %s%n", getName(), task, e.getMessage());
}
}
public void shutdown() {
running = false;
}
public int getTasksCompleted() {
return tasksCompleted;
}
}