Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -44,7 +47,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.Credentials;
Expand Down Expand Up @@ -583,48 +588,61 @@ private List<String> getAMCommand() {
* with the correct resource settings.
* @throws IOException
*/
private void setupRemoteResource(String srcPathString, ApplicationId appId,
private void setupRemoteResource(String srcPath, ApplicationId appId,
DynoResource resource, Map<String, String> env) throws IOException {

Path srcPath = new Path(srcPathString);

FileStatus remoteFileStatus;
Path dstPath;

FileSystem localFS = FileSystem.getLocal(getConf());
// Qualify relative paths against the local FS
srcPath = srcPath.makeQualified(localFS.getUri(), localFS.getWorkingDirectory());

FileSystem srcFS = srcPath.getFileSystem(getConf());

if (srcFS.getUri().equals(localFS.getUri())) {

if (resource.getType() == LocalResourceType.ARCHIVE && localFS.getFileStatus(srcPath).isDirectory()) {
File tempZip = File.createTempFile(srcPath.getName(), ".zip", new File(srcPath.getParent().toUri()));
ZipOutputStream zout = new ZipOutputStream(new FileOutputStream(tempZip));
File srcFile = new File(srcPath.toUri());
addFileToZipRecursively(srcFile, srcFile, zout);
zout.close();
tempZip.deleteOnExit();
srcPath = new Path(tempZip.toURI());
URI srcURI;
try {
srcURI = new URI(srcPath);
} catch (URISyntaxException e) {
throw new IOException(e);
}
if (srcURI.getScheme() == null || srcURI.getScheme().equals(FileSystem.getLocal(getConf()).getScheme()) ||
srcURI.getScheme().equals("jar")) {
// Need to upload this resource to remote storage
File srcFile = new File(srcURI.getSchemeSpecificPart());
dstPath = new Path(getRemoteStoragePath(getConf(), appId), srcFile.getName());
if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) {
if ("jar".equals(srcURI.getScheme())) {
throw new IllegalArgumentException(String.format(
"Resources in JARs can't be auto-zipped; resource %s is ARCHIVE and src is a directory: %s",
resource.getResourcePath(), srcPath));
}
dstPath = dstPath.suffix(".zip");
}
Path dstPath = new Path(getRemoteStoragePath(getConf(), appId), srcPath.getName());
LOG.info("Uploading resource " + resource + " from " + srcPath + " to " + dstPath);
FileSystem remoteFS = dstPath.getFileSystem(getConf());
remoteFS.copyFromLocalFile(false, true, srcPath, dstPath);
LOG.info("Uploading resource " + resource + " from " + srcPath + " to " + dstPath);
try (OutputStream outputStream = remoteFS.create(dstPath, true)) {
if ("jar".equals(srcURI.getScheme())) {
try (InputStream inputStream = new URL(srcPath).openStream()) {
IOUtils.copyBytes(inputStream, outputStream, getConf());
}
} else if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) {
ZipOutputStream zout = new ZipOutputStream(outputStream);
addFileToZipRecursively(srcFile, srcFile, zout);
zout.close();
} else {
try (InputStream inputStream = new FileInputStream(srcFile)){
IOUtils.copyBytes(inputStream, outputStream, getConf());
}
}
}
remoteFileStatus = remoteFS.getFileStatus(dstPath);
srcFS = remoteFS;
} else {
LOG.info("Using resource " + resource + " directly from current location: " + srcPath);
dstPath = new Path(srcPath);
// non-local file system; we can just use it directly from where it is
remoteFileStatus = srcFS.getFileStatus(srcPath);
remoteFileStatus = FileSystem.get(dstPath.toUri(), getConf()).getFileStatus(dstPath);
if (remoteFileStatus.isDirectory()) {
throw new IllegalArgumentException("If resource is on remote filesystem, " +
"must be a file: " + srcPath);
throw new IllegalArgumentException("If resource is on remote filesystem, must be a file: " + srcPath);
}
}
env.put(resource.getLocationEnvVar(), srcFS.makeQualified(remoteFileStatus.getPath()).toString());
env.put(resource.getTimestampEnvVar(), remoteFileStatus.getModificationTime() + "");
env.put(resource.getLengthEnvVar(), remoteFileStatus.getLen() + "");
env.put(resource.getLocationEnvVar(), dstPath.toString());
env.put(resource.getTimestampEnvVar(), String.valueOf(remoteFileStatus.getModificationTime()));
env.put(resource.getLengthEnvVar(), String.valueOf(remoteFileStatus.getLen()));
}

/**
Expand All @@ -643,10 +661,7 @@ private static Path getRemoteStoragePath(Configuration conf, ApplicationId appId
DynoConstants.DYNAMOMETER_STORAGE_DIR + "/" + appId));
}

private static final byte[] COPY_BUFFER = new byte[1024];

private static void addFileToZipRecursively(File root, File file, ZipOutputStream out)
throws IOException {
private void addFileToZipRecursively(File root, File file, ZipOutputStream out) throws IOException {

File[] files = file.listFiles();
if (files == null) { // Not a directory
Expand All @@ -655,10 +670,7 @@ private static void addFileToZipRecursively(File root, File file, ZipOutputStrea
try {
FileInputStream in = new FileInputStream(file.getAbsolutePath());
out.putNextEntry(new ZipEntry(relativePath));
int len;
while ((len = in.read(COPY_BUFFER)) > 0) {
out.write(COPY_BUFFER, 0, len);
}
IOUtils.copyBytes(in, out, getConf(), false);
out.closeEntry();
in.close();
} catch (FileNotFoundException fnfe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -27,6 +31,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.util.JarFinder;
Expand Down Expand Up @@ -94,6 +99,8 @@ public class TestDynamometerInfra {
private static Path fsImageTmpPath;
private static Path fsVersionTmpPath;
private static Path blockImageOutputDir;
private static Path auditTraceDir;
private static Path confZip;
private static File testBaseDir;
private static File hadoopTarballPath;
private static File hadoopUnpackedDir;
Expand Down Expand Up @@ -163,6 +170,8 @@ public static void setupClass() throws Exception {
fsImageTmpPath = fs.makeQualified(new Path("/tmp/" + FSIMAGE_FILENAME));
fsVersionTmpPath = fs.makeQualified(new Path("/tmp/" + VERSION_FILENAME));
blockImageOutputDir = fs.makeQualified(new Path("/tmp/blocks"));
auditTraceDir = fs.makeQualified(new Path("/tmp/audit_trace_direct"));
confZip = fs.makeQualified(new Path("/tmp/conf.zip"));

uploadFsimageResourcesToHDFS(hadoopBinVersion);

Expand Down Expand Up @@ -213,7 +222,7 @@ public void run() {
try {
client.run(new String[] {
"-" + Client.MASTER_MEMORY_MB_ARG, "128",
"-" + Client.CONF_PATH_ARG, getResourcePath("conf").toString(),
"-" + Client.CONF_PATH_ARG, confZip.toString(),
"-" + Client.BLOCK_LIST_PATH_ARG, blockImageOutputDir.toString(),
"-" + Client.FS_IMAGE_DIR_ARG, fsImageTmpPath.getParent().toString(),
"-" + Client.HADOOP_BINARY_PATH_ARG, hadoopTarballPath.getAbsolutePath(),
Expand Down Expand Up @@ -379,8 +388,26 @@ private static void uploadFsimageResourcesToHDFS(String hadoopBinVersion) throws
fs.copyFromLocalFile(new Path(getResourcePath(fsImageResourcePath)), fsImageTmpPath);
fs.copyFromLocalFile(new Path(getResourcePath(fsImageResourcePath + ".md5")), fsImageTmpPath.suffix(".md5"));
fs.copyFromLocalFile(new Path(getResourcePath(hadoopResourcesPath + "/" + VERSION_FILENAME)), fsVersionTmpPath);
fs.copyFromLocalFile(new Path(getResourcePath("blocks")), new Path("/tmp/"));
fs.copyFromLocalFile(new Path(getResourcePath("audit_trace_direct")), new Path("/tmp/"));
fs.mkdirs(auditTraceDir);
IOUtils.copyBytes(TestDynamometerInfra.class.getClassLoader().getResourceAsStream("audit_trace_direct/audit0"),
fs.create(new Path(auditTraceDir, "audit0")), conf, true);
fs.mkdirs(blockImageOutputDir);
for (String blockFile : new String[] { "dn0-a-0-r-00000", "dn1-a-0-r-00001", "dn2-a-0-r-00002" }) {
IOUtils.copyBytes(TestDynamometerInfra.class.getClassLoader().getResourceAsStream("blocks/" + blockFile),
fs.create(new Path(blockImageOutputDir, blockFile)), conf, true);
}
File tempConfZip = new File(testBaseDir, "conf.zip");
ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(tempConfZip));
for (String file : new String[] { "core-site.xml", "hdfs-site.xml", "log4j.properties" }) {
zos.putNextEntry(new ZipEntry("etc/hadoop/" + file));
InputStream is = TestDynamometerInfra.class.getClassLoader().getResourceAsStream("conf/etc/hadoop/" + file);
IOUtils.copyBytes(is, zos, conf, false);
is.close();
zos.closeEntry();
}
zos.close();
fs.copyFromLocalFile(new Path(tempConfZip.toURI()), confZip);
tempConfZip.delete();
}

}