diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 5fc8117855..b2bd610e41 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -14,9 +14,12 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -44,7 +47,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.security.Credentials; @@ -583,48 +588,61 @@ private List getAMCommand() { * with the correct resource settings. * @throws IOException */ - private void setupRemoteResource(String srcPathString, ApplicationId appId, + private void setupRemoteResource(String srcPath, ApplicationId appId, DynoResource resource, Map env) throws IOException { - Path srcPath = new Path(srcPathString); - FileStatus remoteFileStatus; + Path dstPath; - FileSystem localFS = FileSystem.getLocal(getConf()); - // Qualify relative paths against the local FS - srcPath = srcPath.makeQualified(localFS.getUri(), localFS.getWorkingDirectory()); - - FileSystem srcFS = srcPath.getFileSystem(getConf()); - - if (srcFS.getUri().equals(localFS.getUri())) { - - if (resource.getType() == LocalResourceType.ARCHIVE && localFS.getFileStatus(srcPath).isDirectory()) { - File tempZip = File.createTempFile(srcPath.getName(), ".zip", new File(srcPath.getParent().toUri())); - ZipOutputStream zout = new ZipOutputStream(new FileOutputStream(tempZip)); - File srcFile = new File(srcPath.toUri()); - addFileToZipRecursively(srcFile, srcFile, zout); - zout.close(); - tempZip.deleteOnExit(); - srcPath = new Path(tempZip.toURI()); + URI srcURI; + try { + srcURI = new URI(srcPath); + } catch (URISyntaxException e) { + throw new IOException(e); + } + if (srcURI.getScheme() == null || srcURI.getScheme().equals(FileSystem.getLocal(getConf()).getScheme()) || + srcURI.getScheme().equals("jar")) { + // Need to upload this resource to remote storage + File srcFile = new File(srcURI.getSchemeSpecificPart()); + dstPath = new Path(getRemoteStoragePath(getConf(), appId), srcFile.getName()); + if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) { + if ("jar".equals(srcURI.getScheme())) { + throw new IllegalArgumentException(String.format( + "Resources in JARs can't be auto-zipped; resource %s is ARCHIVE and src is a directory: %s", + resource.getResourcePath(), srcPath)); + } + dstPath = dstPath.suffix(".zip"); } - Path dstPath = new Path(getRemoteStoragePath(getConf(), appId), srcPath.getName()); - LOG.info("Uploading resource " + resource + " from " + srcPath + " to " + dstPath); FileSystem remoteFS = dstPath.getFileSystem(getConf()); - remoteFS.copyFromLocalFile(false, true, srcPath, dstPath); + LOG.info("Uploading resource " + resource + " from " + srcPath + " to " + dstPath); + try (OutputStream outputStream = remoteFS.create(dstPath, true)) { + if ("jar".equals(srcURI.getScheme())) { + try (InputStream inputStream = new URL(srcPath).openStream()) { + IOUtils.copyBytes(inputStream, outputStream, getConf()); + } + } else if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) { + ZipOutputStream zout = new ZipOutputStream(outputStream); + addFileToZipRecursively(srcFile, srcFile, zout); + zout.close(); + } else { + try (InputStream inputStream = new FileInputStream(srcFile)){ + IOUtils.copyBytes(inputStream, outputStream, getConf()); + } + } + } remoteFileStatus = remoteFS.getFileStatus(dstPath); - srcFS = remoteFS; } else { LOG.info("Using resource " + resource + " directly from current location: " + srcPath); + dstPath = new Path(srcPath); // non-local file system; we can just use it directly from where it is - remoteFileStatus = srcFS.getFileStatus(srcPath); + remoteFileStatus = FileSystem.get(dstPath.toUri(), getConf()).getFileStatus(dstPath); if (remoteFileStatus.isDirectory()) { - throw new IllegalArgumentException("If resource is on remote filesystem, " + - "must be a file: " + srcPath); + throw new IllegalArgumentException("If resource is on remote filesystem, must be a file: " + srcPath); } } - env.put(resource.getLocationEnvVar(), srcFS.makeQualified(remoteFileStatus.getPath()).toString()); - env.put(resource.getTimestampEnvVar(), remoteFileStatus.getModificationTime() + ""); - env.put(resource.getLengthEnvVar(), remoteFileStatus.getLen() + ""); + env.put(resource.getLocationEnvVar(), dstPath.toString()); + env.put(resource.getTimestampEnvVar(), String.valueOf(remoteFileStatus.getModificationTime())); + env.put(resource.getLengthEnvVar(), String.valueOf(remoteFileStatus.getLen())); } /** @@ -643,10 +661,7 @@ private static Path getRemoteStoragePath(Configuration conf, ApplicationId appId DynoConstants.DYNAMOMETER_STORAGE_DIR + "/" + appId)); } - private static final byte[] COPY_BUFFER = new byte[1024]; - - private static void addFileToZipRecursively(File root, File file, ZipOutputStream out) - throws IOException { + private void addFileToZipRecursively(File root, File file, ZipOutputStream out) throws IOException { File[] files = file.listFiles(); if (files == null) { // Not a directory @@ -655,10 +670,7 @@ private static void addFileToZipRecursively(File root, File file, ZipOutputStrea try { FileInputStream in = new FileInputStream(file.getAbsolutePath()); out.putNextEntry(new ZipEntry(relativePath)); - int len; - while ((len = in.read(COPY_BUFFER)) > 0) { - out.write(COPY_BUFFER, 0, len); - } + IOUtils.copyBytes(in, out, getConf(), false); out.closeEntry(); in.close(); } catch (FileNotFoundException fnfe) { diff --git a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java index b45a48a18b..df291798e1 100644 --- a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java +++ b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java @@ -9,12 +9,16 @@ import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +31,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.util.JarFinder; @@ -94,6 +99,8 @@ public class TestDynamometerInfra { private static Path fsImageTmpPath; private static Path fsVersionTmpPath; private static Path blockImageOutputDir; + private static Path auditTraceDir; + private static Path confZip; private static File testBaseDir; private static File hadoopTarballPath; private static File hadoopUnpackedDir; @@ -163,6 +170,8 @@ public static void setupClass() throws Exception { fsImageTmpPath = fs.makeQualified(new Path("/tmp/" + FSIMAGE_FILENAME)); fsVersionTmpPath = fs.makeQualified(new Path("/tmp/" + VERSION_FILENAME)); blockImageOutputDir = fs.makeQualified(new Path("/tmp/blocks")); + auditTraceDir = fs.makeQualified(new Path("/tmp/audit_trace_direct")); + confZip = fs.makeQualified(new Path("/tmp/conf.zip")); uploadFsimageResourcesToHDFS(hadoopBinVersion); @@ -213,7 +222,7 @@ public void run() { try { client.run(new String[] { "-" + Client.MASTER_MEMORY_MB_ARG, "128", - "-" + Client.CONF_PATH_ARG, getResourcePath("conf").toString(), + "-" + Client.CONF_PATH_ARG, confZip.toString(), "-" + Client.BLOCK_LIST_PATH_ARG, blockImageOutputDir.toString(), "-" + Client.FS_IMAGE_DIR_ARG, fsImageTmpPath.getParent().toString(), "-" + Client.HADOOP_BINARY_PATH_ARG, hadoopTarballPath.getAbsolutePath(), @@ -379,8 +388,26 @@ private static void uploadFsimageResourcesToHDFS(String hadoopBinVersion) throws fs.copyFromLocalFile(new Path(getResourcePath(fsImageResourcePath)), fsImageTmpPath); fs.copyFromLocalFile(new Path(getResourcePath(fsImageResourcePath + ".md5")), fsImageTmpPath.suffix(".md5")); fs.copyFromLocalFile(new Path(getResourcePath(hadoopResourcesPath + "/" + VERSION_FILENAME)), fsVersionTmpPath); - fs.copyFromLocalFile(new Path(getResourcePath("blocks")), new Path("/tmp/")); - fs.copyFromLocalFile(new Path(getResourcePath("audit_trace_direct")), new Path("/tmp/")); + fs.mkdirs(auditTraceDir); + IOUtils.copyBytes(TestDynamometerInfra.class.getClassLoader().getResourceAsStream("audit_trace_direct/audit0"), + fs.create(new Path(auditTraceDir, "audit0")), conf, true); + fs.mkdirs(blockImageOutputDir); + for (String blockFile : new String[] { "dn0-a-0-r-00000", "dn1-a-0-r-00001", "dn2-a-0-r-00002" }) { + IOUtils.copyBytes(TestDynamometerInfra.class.getClassLoader().getResourceAsStream("blocks/" + blockFile), + fs.create(new Path(blockImageOutputDir, blockFile)), conf, true); + } + File tempConfZip = new File(testBaseDir, "conf.zip"); + ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(tempConfZip)); + for (String file : new String[] { "core-site.xml", "hdfs-site.xml", "log4j.properties" }) { + zos.putNextEntry(new ZipEntry("etc/hadoop/" + file)); + InputStream is = TestDynamometerInfra.class.getClassLoader().getResourceAsStream("conf/etc/hadoop/" + file); + IOUtils.copyBytes(is, zos, conf, false); + is.close(); + zos.closeEntry(); + } + zos.close(); + fs.copyFromLocalFile(new Path(tempConfZip.toURI()), confZip); + tempConfZip.delete(); } }