Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions storm-core/src/clj/backtype/storm/converter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@
(.set_owner (:owner storm-base))
(.set_topology_action_options (thriftify-topology-action-options storm-base))
(.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
(.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
(.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))
(.set_version (:version storm-base))))

(defn clojurify-storm-base [^StormBase storm-base]
(if storm-base
Expand All @@ -201,7 +202,8 @@
(.get_owner storm-base)
(clojurify-topology-action-options (.get_topology_action_options storm-base))
(convert-to-symbol-from-status (.get_prev_status storm-base))
(map-val clojurify-debugoptions (.get_component_debug storm-base)))))
(map-val clojurify-debugoptions (.get_component_debug storm-base))
(.get_version storm-base))))

(defn thriftify-stats [stats]
(if stats
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@


;; component->executors is a map from spout/bolt id to number of executors for that component
(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug version])

(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])

Expand Down
108 changes: 87 additions & 21 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.nimbus
(:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
(:import [org.apache.thrift.server THsHaServer THsHaServer$Args]
[backtype.storm.generated UpdateOptions]
[java.util UUID])
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
(:import [org.apache.thrift.exception])
(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
Expand Down Expand Up @@ -58,6 +60,7 @@

(defmeter nimbus:num-submitTopologyWithOpts-calls)
(defmeter nimbus:num-submitTopology-calls)
(defmeter nimbus:num-updateTopology-calls)
(defmeter nimbus:num-killTopologyWithOpts-calls)
(defmeter nimbus:num-killTopology-calls)
(defmeter nimbus:num-rebalance-calls)
Expand Down Expand Up @@ -232,11 +235,25 @@
:activate nil
:rebalance (rebalance-transition nimbus storm-id status)
:kill (kill-transition nimbus storm-id)
; :updating :update
}
; :updating {:startup (fn [] (delay-event nimbus
; storm-id
; (-> storm-base
; :topology-action-options
; :delay-secs)
; :do-rebalance)
; nil)
; :kill (kill-transition nimbus storm-id)
; :do-update (fn []
; (do-update nimbus storm-id status storm-base)
; (:type (:prev-status storm-base)))
; }
:inactive {:activate :active
:inactivate nil
:rebalance (rebalance-transition nimbus storm-id status)
:kill (kill-transition nimbus storm-id)
; :update (update-transition nimbus storm-id)
}
:killed {:startup (fn [] (delay-event nimbus
storm-id
Expand Down Expand Up @@ -392,15 +409,25 @@
)))

(defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology]
(let [ tmproot (str (master-tmp-dir conf) file-path-separator storm-id)
stormroot (master-stormdist-root conf storm-id)]
(log-message "nimbus file location:" stormroot)
(FileUtils/forceMkdir (File. tmproot))
(FileUtils/cleanDirectory (File. tmproot))

(setup-jar conf tmp-jar-location tmproot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path tmproot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path tmproot)) (Utils/toCompressedJsonConf storm-conf))

(FileUtils/deleteDirectory (File. stormroot))
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))

(if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))))


(defn- get-topology [nimbus conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(log-message "nimbus file location:" stormroot)
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/toCompressedJsonConf storm-conf))
(if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
))
(Utils/deserialize (FileUtils/readFileToByteArray (File. (master-stormcode-path stormroot))) StormTopology)))

(defn- wait-for-desired-code-replication [nimbus conf storm-id]
(let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
Expand Down Expand Up @@ -878,7 +905,7 @@
storm-conf (read-storm-conf conf storm-id)
topology (system-topology! storm-conf (read-storm-topology conf storm-id))
num-executors (->> (all-components topology) (map-val num-start-executors))]
(log-message "Activating " storm-name ": " storm-id)
(log-message "Activating " storm-name ": " storm-id " with initial status of " topology-initial-status)
(.activate-storm! storm-cluster-state
storm-id
(StormBase. storm-name
Expand All @@ -889,8 +916,9 @@
(storm-conf TOPOLOGY-SUBMITTER-USER)
nil
nil
{}))
(notify-topology-action-listener nimbus storm-name "activate")))
{}
0))
(notify-topology-action-listener nimbus storm-name (if (= :active topology-initial-status) "activate" "deactivate"))))

;; Master:
;; job submit:
Expand Down Expand Up @@ -1210,6 +1238,17 @@
(.set_reset_log_level_timeout_epoch log-config (coerce/to-long timeout))
(.unset_reset_log_level_timeout_epoch log-config))))

(defn- validate-conf-and-topology [nimbus storm-name serializedConf topology]
(let [topo-conf (from-json serializedConf)]
(try
(validate-configs-with-schemas topo-conf)
(catch IllegalArgumentException ex
(throw (InvalidTopologyException. (.getMessage ex)))))
(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
storm-name
topo-conf
topology)))

(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
Expand Down Expand Up @@ -1325,15 +1364,7 @@
(validate-topology-name! storm-name)
(check-authorization! nimbus storm-name nil "submitTopology")
(check-storm-active! nimbus storm-name false)
(let [topo-conf (from-json serializedConf)]
(try
(validate-configs-with-schemas topo-conf)
(catch IllegalArgumentException ex
(throw (InvalidTopologyException. (.getMessage ex)))))
(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
storm-name
topo-conf
topology))
(validate-conf-and-topology nimbus storm-name serializedConf topology)
(swap! (:submitted-count nimbus) inc)
(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
credentials (.get_creds submitOptions)
Expand Down Expand Up @@ -1398,6 +1429,41 @@
(mark! nimbus:num-submitTopology-calls)
(.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
(SubmitOptions. TopologyInitialStatus/ACTIVE)))

(^void updateTopology
[this ^String storm-name ^UpdateOptions updateOptions]
(mark! nimbus:num-updateTopology-calls)
(check-authorization! nimbus storm-name nil "updateTopology")
(is-leader nimbus)
(locking (:submit-lock nimbus)
(check-storm-active! nimbus storm-name true)
(let [storm-cluster-state (:storm-cluster-state nimbus)
storm-id (get-storm-id storm-cluster-state storm-name)
storm-root-dir (master-stormdist-root conf storm-id)
storm-base (.storm-base storm-cluster-state storm-id nil)
topo-conf (merge (try-read-storm-conf-from-name conf storm-name nimbus)
(if (.is_set_jsonConf updateOptions) (from-json (.get_jsonConf updateOptions)) {}))
total-storm-conf (merge conf topo-conf)
topology (if (.is_set_topology updateOptions) (.get_topology updateOptions) (get-topology nimbus conf storm-id))
topology (normalize-topology topo-conf topology)
jar-location (if (.is_set_uploadedJarLocation updateOptions) (.get_uploadedJarLocation updateOptions) (master-stormjar-path storm-root-dir))
version (inc (:version storm-base))]
(validate-conf-and-topology nimbus storm-name (to-json topo-conf) topology)
(system-topology! topo-conf topology)
(validate-topology-size topo-conf conf topology)
(setup-storm-code nimbus conf storm-id jar-location topo-conf topology)
(.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))
(wait-for-desired-code-replication nimbus total-storm-conf storm-id)
(notify-topology-action-listener nimbus storm-name "updateTopology")

;;update the storm base with the new version.
(.update-storm! storm-cluster-state storm-id {:version version})

;;TODO: Re-assignment will mean all the supervisor may end up killing their workers and restarting,
;;What we really want is each supervisor should kill one worker at a time and restart it.

(when (.is_set_topology updateOptions) (mk-assignments nimbus :scratch-topology-id storm-id))
)))

(^void killTopology [this ^String name]
(mark! nimbus:num-killTopology-calls)
Expand Down
60 changes: 51 additions & 9 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.supervisor
(:import [java.io OutputStreamWriter BufferedWriter IOException])
(:import [java.io OutputStreamWriter BufferedWriter IOException]
[backtype.storm.generated LSSupervisorAssignments])
(:import [backtype.storm.scheduler ISupervisor]
[backtype.storm.utils LocalState Time Utils]
[backtype.storm.daemon Shutdownable]
Expand All @@ -26,7 +27,7 @@
(:use [backtype.storm config util log timer local-state])
(:import [backtype.storm.utils VersionInfo])
(:import [backtype.storm Config])
(:import [backtype.storm.generated WorkerResources ProfileAction])
(:import [backtype.storm.generated WorkerResources ProfileAction LSWorkerHeartbeat])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.command [healthcheck :as healthcheck]])
(:require [backtype.storm.daemon [worker :as worker]]
Expand Down Expand Up @@ -157,13 +158,19 @@
(> (- now (:time-secs hb))
(conf SUPERVISOR-WORKER-TIMEOUT-SECS)))


(defn get-latest-topology-version-from-zk [cluster-state storm-id]
(:version (.storm-base cluster-state storm-id nil)))

(defn read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
[supervisor assigned-executors now]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
approved-ids (set (keys (ls-approved-workers local-state)))]
approved-ids (set (keys (ls-approved-workers local-state)))
worker-set-to-be-updated (atom false)
]
(into
{}
(dofor [[id hb] id->heartbeat]
Expand All @@ -173,6 +180,14 @@
(or (not (contains? approved-ids id))
(not (matches-an-assignment? hb assigned-executors)))
:disallowed
(let [worker-topo-version (:topology-version hb)
latest-topo-version (get-latest-topology-version-from-zk (:storm-cluster-state supervisor) (:storm-id hb))
needs-update (not (= worker-topo-version latest-topo-version))]
(when needs-update
(log-message "Worker Process " id " with outdated topology-version " (:topology-version hb)
" needs to be updated to latest-version " latest-topo-version)
needs-update))
:update
(or
(when (get (get-dead-workers) id)
(log-message "Worker Process " id " has died!")
Expand Down Expand Up @@ -333,6 +348,14 @@
:stormid->profiler-actions (atom {})
})

;(defn get-latest-topology-versions-from-zk [cluster-state supervisor-assignments]
; "Returns a map of storm-id/topology-id -> latest version of topology from zookeeper"
; (let [topology-ids (set (map-val (fn[^LocalAssignment assignment](.get_topology_id assignment)) supervisor-assignments))]
; (into {}
; (for [topology-id topology-ids])
; [topology-id (:version (.storm-base cluster-state topology-id nil))])))


(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
download-lock (:download-lock supervisor)
Expand All @@ -345,14 +368,17 @@
(fn [[state _]] (= state :valid))
allocated)
keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
storm-ids-needing-update (into #{}
(map-val (fn [[state hb]] (:storm-id hb))
(filter-val (fn [[state hb]] (= state :update)) allocated)))
reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
new-worker-ids (into
{}
(for [port (keys reassign-executors)]
[port (uuid)]))
]
;; 1. to kill are those in allocated that are dead or disallowed
;; 2. kill the ones that should be dead
;; 2. kill the ones that should be dead or needs topology-version-updates
;; - read pids, kill -9 and individually remove file
;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
;; 3. of the rest, figure out what assignments aren't yet satisfied
Expand All @@ -362,8 +388,11 @@
;; 6. wait for workers launch

(log-debug "Syncing processes")
(log-debug "Reassign-executors: " reassign-executors )
(log-debug "Assigned executors: " assigned-executors)
(log-debug "Storm ids needing update: " storm-ids-needing-update)
(log-debug "Allocated: " allocated)

(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
Expand Down Expand Up @@ -394,13 +423,15 @@
assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
(get cached-assignment-info storm-id)
(.assignment-info-with-version storm-cluster-state storm-id nil))
storm-code-map (read-storm-code-locations assignment-info)
storm-code-map (read-storm-code-locations assignment-info)
master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
stormroot (supervisor-stormdist-root conf storm-id)]
(if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
(if-not (and (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
(not (contains? storm-ids-needing-update storm-id)))
(download-storm-code conf storm-id master-code-dir supervisor download-lock))
))


(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
Expand All @@ -422,7 +453,8 @@
(:storm-id assignment)
port
id
mem-onheap)
mem-onheap
(get-latest-topology-version-from-zk storm-cluster-state (:storm-id assignment)));;TODO
(mark! supervisor:num-workers-launched)
(catch java.io.FileNotFoundException e
(log-message "Unable to launch worker due to "
Expand Down Expand Up @@ -454,6 +486,15 @@
(shutdown-worker supervisor id))
))


;(defn update-topology[conf cluster-state]
; (doseq [storm-id (.active-storms cluster-state )]
; (let [storm-base (.storm-base cluster-state storm-id (fn[] (update-topology conf cluster-state)))
; topology-version-in-zk (.get_topology_version storm-base)
; worker-hb (read-worker-heartbeat conf storm-id)
; ])
; ))

(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
Expand Down Expand Up @@ -822,7 +863,7 @@
(create-symlink! worker-dir topo-dir "artifacts" port))))

(defmethod launch-worker
:distributed [supervisor storm-id port worker-id mem-onheap]
:distributed [supervisor storm-id port worker-id mem-onheap topology-version]
(let [conf (:conf supervisor)
run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
storm-home (System/getProperty "storm.home")
Expand Down Expand Up @@ -901,7 +942,8 @@
storm-id
(:assignment-id supervisor)
port
worker-id])
worker-id
topology-version])
command (->> command (map str) (filter (complement empty?)))]
(log-message "Launching worker with command: " (shell-cmd command))
(write-log-metadata! storm-conf user worker-id storm-id port conf)
Expand Down
Loading