Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.daemon.local-supervisor
(:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils]
[org.apache.storm.utils Utils ConfigUtils]
[org.apache.storm.utils Time Utils ConfigUtils]
[org.apache.storm ProcessSimulator])
(:use [org.apache.storm.daemon common]
[org.apache.storm log])
Expand All @@ -34,7 +34,9 @@
workerId)]
(ConfigUtils/setWorkerUserWSE conf workerId "")
(ProcessSimulator/registerProcess pid worker)
(.put (.getWorkerThreadPids supervisorData) workerId pid)))
(.put (.getWorkerThreadPids supervisorData) workerId pid)
(.put (.getWorkerIdsToLaunchTimes supervisorData) workerId (Time/currentTimeSecs))
(.put (.getWorkerIdsToPorts supervisorData) workerId port)))

(defn shutdown-local-worker [supervisorData worker-manager workerId]
(log-message "shutdown-local-worker")
Expand All @@ -61,4 +63,4 @@
(let [local-process (local-process)
supervisor-server (Supervisor.)]
(.setLocalSyncProcess supervisor-server local-process)
(.mkSupervisor supervisor-server conf shared-context isupervisor)))
(.mkSupervisor supervisor-server conf shared-context isupervisor)))
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class SupervisorData {
private final Utils.UptimeComputer upTime;
private final String stormVersion;
private final ConcurrentHashMap<String, String> workerThreadPids; // for local mode
private final ConcurrentHashMap<String, Long> workerIdsToLaunchTimes;
private final ConcurrentHashMap<String, Integer> workerIdsToPorts;
private final IStormClusterState stormClusterState;
private final LocalState localState;
private final String supervisorId;
Expand All @@ -84,6 +86,8 @@ public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor)
this.upTime = Utils.makeUptimeComputer();
this.stormVersion = VersionInfo.getVersion();
this.workerThreadPids = new ConcurrentHashMap<String, String>();
this.workerIdsToLaunchTimes = new ConcurrentHashMap<String, Long>();
this.workerIdsToPorts = new ConcurrentHashMap<String, Integer>();
this.deadWorkers = new ConcurrentHashSet();

List<ACL> acls = null;
Expand Down Expand Up @@ -168,6 +172,14 @@ public ConcurrentHashMap<String, String> getWorkerThreadPids() {
return workerThreadPids;
}

public ConcurrentHashMap<String, Long> getWorkerIdsToLaunchTimes() {
return workerIdsToLaunchTimes;
}

public ConcurrentHashMap<String, Integer> getWorkerIdsToPorts() {
return workerIdsToPorts;
}

public IStormClusterState getStormClusterState() {
return stormClusterState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public void init(SupervisorData supervisorData){
this.localState = supervisorData.getLocalState();
}

private Boolean isWorkerStartTimeoutExpired(String workerId) {
Long launchTime = supervisorData.getWorkerIdsToLaunchTimes().get(workerId);
Integer timeOut = Utils.getInt(supervisorData.getConf().get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS));

return (launchTime == null || ((Time.currentTimeSecs() - launchTime) > timeOut));
}

@Override
public void run() {
LOG.debug("Syncing processes");
Expand All @@ -110,34 +117,47 @@ public void run() {
keeperWorkerIds.add(entry.getKey());
keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
}
if (stateHeartbeat.getState() == State.NOT_STARTED) {
keeperWorkerIds.add(entry.getKey());
keepPorts.add(supervisorData.getWorkerIdsToPorts().get(entry.getKey()));
}
}
Map<Integer, LocalAssignment> reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts);
Map<Integer, String> newWorkerIds = new HashMap<>();
for (Integer port : reassignExecutors.keySet()) {
newWorkerIds.put(port, Utils.uuid());
}

LOG.debug("Assigned executors: {}", assignedExecutors);
LOG.debug("Allocated: {}", localWorkerStats);
LOG.debug("Keeper worker ids: {}", keeperWorkerIds);
LOG.debug("Keep ports: {}", keepPorts);
LOG.debug("LaunchTimes: {}", supervisorData.getWorkerIdsToLaunchTimes());
LOG.debug("Ids Ports: {}", supervisorData.getWorkerIdsToPorts());

for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) {
StateHeartbeat stateHeartbeat = entry.getValue();
if (stateHeartbeat.getState() != State.VALID) {
if ((stateHeartbeat.getState() != State.VALID && stateHeartbeat.getState() != State.NOT_STARTED) ||
(stateHeartbeat.getState() == State.NOT_STARTED && isWorkerStartTimeoutExpired(entry.getKey()))) {
LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clojure equivalent for this logs also when the worker could not be started. So, in addition to the "Shutting down" message, there is a "Worker X failed to start" message in the case where the timeout expired (e.g. state is NOT_STARTED). That's a nice thing to have I think (should probably be a LOG.error).

stateHeartbeat.getState(), stateHeartbeat.getHeartbeat());
killWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey());
}
}
// start new workers
Map<String, Integer> newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors);
Map<String, Integer> newWorkerIdsToPorts = startNewWorkers(newWorkerIds, reassignExecutors);

Map<String, Integer> allWorkerPortToIds = new HashMap<>();
Map<String, Integer> allWorkerIdsToPorts = new HashMap<>();
Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
for (String keeper : keeperWorkerIds) {
allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
allWorkerIdsToPorts.put(keeper, approvedWorkers.get(keeper));
}
allWorkerIdsToPorts.putAll(newWorkerIdsToPorts);
localState.setApprovedWorkers(allWorkerIdsToPorts);
supervisorData.getWorkerIdsToPorts().putAll(newWorkerIdsToPorts);
for (String workerId : newWorkerIdsToPorts.keySet()) {
supervisorData.getWorkerIdsToLaunchTimes().put(workerId, new Long(Time.currentTimeSecs()));
}
allWorkerPortToIds.putAll(newWorkerPortToIds);
localState.setApprovedWorkers(allWorkerPortToIds);
waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());

} catch (Exception e) {
LOG.error("Failed Sync Process", e);
Expand All @@ -146,24 +166,6 @@ public void run() {

}

protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) throws Exception {
int startTime = Time.currentTimeSecs();
int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
for (String workerId : workerIds) {
LocalState localState = ConfigUtils.workerState(conf, workerId);
while (true) {
LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut)
break;
LOG.info("{} still hasn't started", workerId);
Time.sleep(500);
}
if (localState.getWorkerHeartBeat() == null) {
LOG.info("Worker {} failed to start", workerId);
}
}
}

protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
reassignExecutors.putAll(assignExecutors);
Expand Down Expand Up @@ -422,6 +424,8 @@ public void killWorker(SupervisorData supervisorData, IWorkerManager workerManag
boolean success = workerManager.cleanupWorker(workerId);
if (success){
supervisorData.getDeadWorkers().remove(workerId);
supervisorData.getWorkerIdsToLaunchTimes().remove(workerId);
supervisorData.getWorkerIdsToPorts().remove(workerId);
}
}
}