diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index e70ba9bf16..a256f63550 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -27,7 +27,7 @@ [org.apache.commons.io FileUtils]) (:use [org.apache.storm config util log converter local-state-converter]) (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) - (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) + (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo Zipper]) (:import [java.nio.file Files StandardCopyOption]) (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment]) (:import [org.apache.storm Config ProcessSimulator]) @@ -393,12 +393,12 @@ (defn required-topo-files-exist? [conf storm-id] (let [stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id) - stormjarpath (ConfigUtils/supervisorStormJarPath stormroot) + storm-jar-zip-path (ConfigUtils/supervisorStormJarZipPath stormroot) stormcodepath (ConfigUtils/supervisorStormCodePath stormroot) stormconfpath (ConfigUtils/supervisorStormConfPath stormroot)] (and (every? #(Utils/checkFileExists %) [stormroot stormconfpath stormcodepath]) (or (ConfigUtils/isLocalMode conf) - (Utils/checkFileExists stormjarpath))))) + (Utils/checkFileExists storm-jar-zip-path))))) (defn get-worker-assignment-helper-msg [assignment supervisor port id] @@ -1053,13 +1053,14 @@ (if (conf SUPERVISOR-RUN-WORKER-AS-USER) (throw (RuntimeException. (str "ERROR: Windows doesn't implement setting the correct permissions"))))) (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormJarKey storm-id) - (ConfigUtils/supervisorStormJarPath tmproot) blobstore) + (ConfigUtils/supervisorStormJarZipPath tmproot) blobstore) (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormCodeKey storm-id) (ConfigUtils/supervisorStormCodePath tmproot) blobstore) (Utils/downloadResourcesAsSupervisor (ConfigUtils/masterStormConfKey storm-id) (ConfigUtils/supervisorStormConfPath tmproot) blobstore) (.shutdown blobstore) - (Utils/extractDirFromJar (ConfigUtils/supervisorStormJarPath tmproot) ConfigUtils/RESOURCES_SUBDIR tmproot) + (Zipper/unzip (ConfigUtils/supervisorStormJarZipPath tmproot) tmproot) + ; (Utils/extractDirFromJar (ConfigUtils/supervisorStormJarPath tmproot) ConfigUtils/RESOURCES_SUBDIR tmproot) (download-blobs-for-topology! conf (ConfigUtils/supervisorStormConfPath tmproot) localizer tmproot) (if (download-blobs-for-topology-succeed? (ConfigUtils/supervisorStormConfPath tmproot) tmproot) @@ -1176,13 +1177,13 @@ (str storm-home Utils/FILE_PATH_SEPARATOR "log4j2")) stormroot (ConfigUtils/supervisorStormDistRoot conf storm-id) jlp (jlp stormroot conf) - stormjar (ConfigUtils/supervisorStormJarPath stormroot) + stormjarlist (Utils/getFullJars (ConfigUtils/concatIfNotNull stormroot)) storm-conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)] [cp] []) classpath (-> (Utils/workerClasspath) - (Utils/addToClasspath [stormjar]) + (Utils/addToClasspath stormjarlist) (Utils/addToClasspath topo-classpath)) top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS) diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java index c199f97944..2da5fad134 100644 --- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java +++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java @@ -17,27 +17,40 @@ */ package org.apache.storm; -import java.io.File; -import java.nio.ByteBuffer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.HashMap; -import java.util.Map; - +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.Credentials; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.NotAliveException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.generated.TopologySummary; import org.apache.storm.scheduler.resource.ResourceUtils; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.utils.BufferFileInputStream; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.Zipper; import org.apache.storm.validation.ConfigValidation; -import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.security.auth.IAutoCredentials; -import org.apache.storm.security.auth.AuthUtils; -import org.apache.storm.generated.*; -import org.apache.storm.utils.BufferFileInputStream; -import org.apache.storm.utils.NimbusClient; -import org.apache.storm.utils.Utils; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Use this class to submit topologies to run on the Storm cluster. You should run your program @@ -382,15 +395,33 @@ public static String submitJarAs(Map conf, String localJar, ProgressListener lis throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload."); } + String[] jars = localJar.split(":"); + File tmpDir = FileUtils.getTempDirectory(); + String zipFilePath = new File(tmpDir, "stormjars.zip").getAbsolutePath(); + LOG.info("Zipping " + localJar + " to " + zipFilePath); + try { + Zipper.zip(Arrays.asList(jars), zipFilePath); + } catch (IOException e) { + throw new RuntimeException("Failed to zip jars " + localJar + " into zip " + zipFilePath, e); + } + + return submitFileAs(conf, zipFilePath, listener, asUser); + } + + private static String submitFileAs(Map conf, String localFile, ProgressListener listener, String asUser) { + if (localFile == null) { + throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload."); + } + NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser); try { String uploadLocation = client.getClient().beginFileUpload(); - LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation); - BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES); + LOG.info("Uploading local file " + localFile + " to assigned location: " + uploadLocation); + BufferFileInputStream is = new BufferFileInputStream(localFile, THRIFT_CHUNK_SIZE_BYTES); - long totalSize = new File(localJar).length(); + long totalSize = new File(localFile).length(); if (listener != null) { - listener.onStart(localJar, uploadLocation, totalSize); + listener.onStart(localFile, uploadLocation, totalSize); } long bytesUploaded = 0; @@ -398,7 +429,7 @@ public static String submitJarAs(Map conf, String localJar, ProgressListener lis byte[] toSubmit = is.read(); bytesUploaded += toSubmit.length; if (listener != null) { - listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize); + listener.onProgress(localFile, uploadLocation, bytesUploaded, totalSize); } if(toSubmit.length==0) break; @@ -407,10 +438,10 @@ public static String submitJarAs(Map conf, String localJar, ProgressListener lis client.getClient().finishFileUpload(uploadLocation); if (listener != null) { - listener.onCompleted(localJar, uploadLocation, totalSize); + listener.onCompleted(localFile, uploadLocation, totalSize); } - LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation); + LOG.info("Successfully uploaded " + localFile + " to assigned location: " + uploadLocation); return uploadLocation; } catch(Exception e) { throw new RuntimeException(e); diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java index 0f53343e54..241ab784a1 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -216,7 +216,7 @@ public static String masterLocalDir(Map conf) throws IOException { } public static String masterStormJarKey(String topologyId) { - return (topologyId + "-stormjar.jar"); + return (topologyId + "-stormjar.zip"); } public static String masterStormCodeKey(String topologyId) { @@ -253,7 +253,7 @@ public static Map readSupervisorStormConfGivenPath(Map conf, String stormConfPat } public static String masterStormJarPath(String stormRoot) { - return (stormRoot + FILE_SEPARATOR + "stormjar.jar"); + return (stormRoot + FILE_SEPARATOR + "stormjar.zip"); } public static String masterInbox(Map conf) throws IOException { @@ -308,8 +308,8 @@ public static String concatIfNotNull(String dir) { return ret; } - public static String supervisorStormJarPath(String stormRoot) { - return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormjar.jar"); + public static String supervisorStormJarZipPath(String stormRoot) { + return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + "stormjar.zip"); } public static String supervisorStormCodePath(String stormRoot) { @@ -396,7 +396,7 @@ public static String getWorkerUser(Map conf, String workerId) { public static String getIdFromBlobKey(String key) { if (key == null) return null; - final String STORM_JAR_SUFFIX = "-stormjar.jar"; + final String STORM_JAR_SUFFIX = "-stormjar.zip"; final String STORM_CODE_SUFFIX = "-stormcode.ser"; final String STORM_CONF_SUFFIX = "-stormconf.ser"; diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 93f4aa694c..b99d115158 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -2032,7 +2032,7 @@ public String currentClasspathImpl() { * @param dir the directory to search * @return the jar file names */ - private static List getFullJars(String dir) { + public static List getFullJars(String dir) { File[] files = new File(dir).listFiles(jarFilter); if(files == null) { diff --git a/storm-core/src/jvm/org/apache/storm/utils/Zipper.java b/storm-core/src/jvm/org/apache/storm/utils/Zipper.java new file mode 100644 index 0000000000..9762b18a52 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/utils/Zipper.java @@ -0,0 +1,82 @@ +package org.apache.storm.utils; + +import org.apache.commons.io.FileUtils; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +public class Zipper { + + public static void zip(List paths, String zipFilePath) throws IOException { + File zipFile = new File(zipFilePath); + if (!zipFile.exists()) { + zipFile.delete(); + } + zipFile.createNewFile(); + + ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile)); + Map zipEntryFiles = new HashMap<>(); + for (String path : paths) { + File file = new File(path); + String parentPath = file.getParent(); + if (file.isDirectory()) { + Collection nestedFiles = FileUtils.listFiles(file, null, true); + for (File nestedFile : nestedFiles) { + String relativePath = Paths.get(parentPath).relativize(Paths.get(nestedFile.getPath())).toString(); + zipEntryFiles.put(relativePath, nestedFile); + } + } else { + zipEntryFiles.put(file.getName(), file); + } + } + for (Map.Entry zipEntryFile : zipEntryFiles.entrySet()) { + File fileToBeZipped = zipEntryFile.getValue(); + zipOutputStream.putNextEntry(new ZipEntry(zipEntryFile.getKey())); + copyContents(zipOutputStream, new FileInputStream(fileToBeZipped)); + zipOutputStream.closeEntry(); + } + zipOutputStream.close(); + } + + public static void unzip(String zipFile, String localDir) throws IOException { + File file = new File(localDir); + if (!file.exists()) { + file.mkdir(); + } + + ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile)); + ZipEntry entry = zipInputStream.getNextEntry(); + while (null != entry) { + File outputFile = new File(localDir, entry.getName()); + outputFile.getParentFile().mkdirs(); + FileOutputStream fileOutputStream = new FileOutputStream(outputFile); + copyContents(fileOutputStream, zipInputStream); + zipInputStream.closeEntry(); + entry = zipInputStream.getNextEntry(); + } + zipInputStream.close(); + } + + private static void copyContents(OutputStream out, InputStream inputStream) throws IOException { + BufferedInputStream in = new BufferedInputStream(inputStream); + byte[] buffer = new byte[1024]; + int len; + while ((len = in.read(buffer)) >= 0) { + out.write(buffer, 0, len); + } + } + +}