From 8c8f2126446b932c51cce66dd93ffcfb7e16fc8e Mon Sep 17 00:00:00 2001 From: Paul Poulosky Date: Wed, 27 Apr 2016 20:41:38 +0000 Subject: [PATCH] Remove two minute timeout after worker launch --- .../apache/storm/daemon/local_supervisor.clj | 8 +-- .../daemon/supervisor/SupervisorData.java | 12 +++++ .../daemon/supervisor/SyncProcessEvent.java | 54 ++++++++++--------- 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj index 560ae3e644..e9492c53f3 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj @@ -15,7 +15,7 @@ ;; limitations under the License. (ns org.apache.storm.daemon.local-supervisor (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] - [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm.utils Time Utils ConfigUtils] [org.apache.storm ProcessSimulator]) (:use [org.apache.storm.daemon common] [org.apache.storm log]) @@ -34,7 +34,9 @@ workerId)] (ConfigUtils/setWorkerUserWSE conf workerId "") (ProcessSimulator/registerProcess pid worker) - (.put (.getWorkerThreadPids supervisorData) workerId pid))) + (.put (.getWorkerThreadPids supervisorData) workerId pid) + (.put (.getWorkerIdsToLaunchTimes supervisorData) workerId (Time/currentTimeSecs)) + (.put (.getWorkerIdsToPorts supervisorData) workerId port))) (defn shutdown-local-worker [supervisorData worker-manager workerId] (log-message "shutdown-local-worker") @@ -61,4 +63,4 @@ (let [local-process (local-process) supervisor-server (Supervisor.)] (.setLocalSyncProcess supervisor-server local-process) - (.mkSupervisor supervisor-server conf shared-context isupervisor))) \ No newline at end of file + (.mkSupervisor supervisor-server conf shared-context isupervisor))) diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java index da4102caf5..5d0dabfc67 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java @@ -58,6 +58,8 @@ public class SupervisorData { private final Utils.UptimeComputer upTime; private final String stormVersion; private final ConcurrentHashMap workerThreadPids; // for local mode + private final ConcurrentHashMap workerIdsToLaunchTimes; + private final ConcurrentHashMap workerIdsToPorts; private final IStormClusterState stormClusterState; private final LocalState localState; private final String supervisorId; @@ -84,6 +86,8 @@ public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) this.upTime = Utils.makeUptimeComputer(); this.stormVersion = VersionInfo.getVersion(); this.workerThreadPids = new ConcurrentHashMap(); + this.workerIdsToLaunchTimes = new ConcurrentHashMap(); + this.workerIdsToPorts = new ConcurrentHashMap(); this.deadWorkers = new ConcurrentHashSet(); List acls = null; @@ -168,6 +172,14 @@ public ConcurrentHashMap getWorkerThreadPids() { return workerThreadPids; } + public ConcurrentHashMap getWorkerIdsToLaunchTimes() { + return workerIdsToLaunchTimes; + } + + public ConcurrentHashMap getWorkerIdsToPorts() { + return workerIdsToPorts; + } + public IStormClusterState getStormClusterState() { return stormClusterState; } diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java index 38b79e18e9..8416da2e0c 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java @@ -88,6 +88,13 @@ public void init(SupervisorData supervisorData){ this.localState = supervisorData.getLocalState(); } + private Boolean isWorkerStartTimeoutExpired(String workerId) { + Long launchTime = supervisorData.getWorkerIdsToLaunchTimes().get(workerId); + Integer timeOut = Utils.getInt(supervisorData.getConf().get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS)); + + return (launchTime == null || ((Time.currentTimeSecs() - launchTime) > timeOut)); + } + @Override public void run() { LOG.debug("Syncing processes"); @@ -110,34 +117,47 @@ public void run() { keeperWorkerIds.add(entry.getKey()); keepPorts.add(stateHeartbeat.getHeartbeat().get_port()); } + if (stateHeartbeat.getState() == State.NOT_STARTED) { + keeperWorkerIds.add(entry.getKey()); + keepPorts.add(supervisorData.getWorkerIdsToPorts().get(entry.getKey())); + } } Map reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts); Map newWorkerIds = new HashMap<>(); for (Integer port : reassignExecutors.keySet()) { newWorkerIds.put(port, Utils.uuid()); } + LOG.debug("Assigned executors: {}", assignedExecutors); LOG.debug("Allocated: {}", localWorkerStats); + LOG.debug("Keeper worker ids: {}", keeperWorkerIds); + LOG.debug("Keep ports: {}", keepPorts); + LOG.debug("LaunchTimes: {}", supervisorData.getWorkerIdsToLaunchTimes()); + LOG.debug("Ids Ports: {}", supervisorData.getWorkerIdsToPorts()); for (Map.Entry entry : localWorkerStats.entrySet()) { StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat.getState() != State.VALID) { + if ((stateHeartbeat.getState() != State.VALID && stateHeartbeat.getState() != State.NOT_STARTED) || + (stateHeartbeat.getState() == State.NOT_STARTED && isWorkerStartTimeoutExpired(entry.getKey()))) { LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now, stateHeartbeat.getState(), stateHeartbeat.getHeartbeat()); killWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey()); } } // start new workers - Map newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors); + Map newWorkerIdsToPorts = startNewWorkers(newWorkerIds, reassignExecutors); - Map allWorkerPortToIds = new HashMap<>(); + Map allWorkerIdsToPorts = new HashMap<>(); Map approvedWorkers = localState.getApprovedWorkers(); for (String keeper : keeperWorkerIds) { - allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper)); + allWorkerIdsToPorts.put(keeper, approvedWorkers.get(keeper)); + } + allWorkerIdsToPorts.putAll(newWorkerIdsToPorts); + localState.setApprovedWorkers(allWorkerIdsToPorts); + supervisorData.getWorkerIdsToPorts().putAll(newWorkerIdsToPorts); + for (String workerId : newWorkerIdsToPorts.keySet()) { + supervisorData.getWorkerIdsToLaunchTimes().put(workerId, new Long(Time.currentTimeSecs())); } - allWorkerPortToIds.putAll(newWorkerPortToIds); - localState.setApprovedWorkers(allWorkerPortToIds); - waitForWorkersLaunch(conf, newWorkerPortToIds.keySet()); } catch (Exception e) { LOG.error("Failed Sync Process", e); @@ -146,24 +166,6 @@ public void run() { } - protected void waitForWorkersLaunch(Map conf, Set workerIds) throws Exception { - int startTime = Time.currentTimeSecs(); - int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS); - for (String workerId : workerIds) { - LocalState localState = ConfigUtils.workerState(conf, workerId); - while (true) { - LSWorkerHeartbeat hb = localState.getWorkerHeartBeat(); - if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut) - break; - LOG.info("{} still hasn't started", workerId); - Time.sleep(500); - } - if (localState.getWorkerHeartBeat() == null) { - LOG.info("Worker {} failed to start", workerId); - } - } - } - protected Map getReassignExecutors(Map assignExecutors, Set keepPorts) { Map reassignExecutors = new HashMap<>(); reassignExecutors.putAll(assignExecutors); @@ -422,6 +424,8 @@ public void killWorker(SupervisorData supervisorData, IWorkerManager workerManag boolean success = workerManager.cleanupWorker(workerId); if (success){ supervisorData.getDeadWorkers().remove(workerId); + supervisorData.getWorkerIdsToLaunchTimes().remove(workerId); + supervisorData.getWorkerIdsToPorts().remove(workerId); } } }