Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ subprojects {
apply plugin: 'idea'
apply plugin: 'license'

sourceCompatibility = 1.7
sourceCompatibility = 1.8

license {
header rootProject.file('license_header')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ private Map<String, LocalResource> getLocalResources() throws IOException {
addAsLocalResourceFromEnv(DynoConstants.START_SCRIPT, localResources, envs);
addAsLocalResourceFromEnv(DynoConstants.HADOOP_BINARY, localResources, envs);
addAsLocalResourceFromEnv(DynoConstants.VERSION, localResources, envs);
addAsLocalResourceFromEnv(DynoConstants.DYNO_JAR, localResources, envs);
addAsLocalResourceFromEnv(DynoConstants.DYNO_DEPENDENCIES, localResources, envs);
if (isNameNodeLauncher) {
addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE, localResources, envs);
addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE_MD5, localResources, envs);
Expand Down
138 changes: 89 additions & 49 deletions dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.StandardSystemProperty;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper;
Expand All @@ -19,18 +21,20 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.cli.CommandLine;
Expand All @@ -51,7 +55,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Job;
Expand Down Expand Up @@ -85,6 +88,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;


/**
* Client for submitting a Dynamometer YARN application, and optionally, a workload MapReduce job.
* This client uploads resources to HDFS as necessary for them to be accessed by the
Expand Down Expand Up @@ -147,6 +151,8 @@ public class Client extends Configured implements Tool {
public static final String WORKLOAD_RATE_FACTOR_DEFAULT = "1.0";
public static final String WORKLOAD_CONFIG_ARG = "workload_config";

private static final String[] ARCHIVE_FILE_TYPES = { ".zip", ".tar", ".tgz", ".tar.gz" };

private static final String START_SCRIPT_LOCATION =
Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString();

Expand All @@ -160,8 +166,8 @@ public class Client extends Configured implements Tool {
// Amt. of virtual core resource to request for to run the App Master
private int amVCores = 1;

// Application master jar file
private String appMasterJar = "";
// Dependency JARs. Should include, at minimum, the JAR for the App Master
private final String[] dependencyJars;

private String hadoopBinary = "";
// Location of DN conf zip
Expand Down Expand Up @@ -248,8 +254,10 @@ public int run(String[] args) {
return 2;
}

public Client(String appMasterJar) {
this.appMasterJar = appMasterJar;
public Client(String... dependencyJars) {
Preconditions.checkArgument(dependencyJars != null && dependencyJars.length > 0,
"Must specify at least one dependency JAR for the ApplicationMaster");
this.dependencyJars = dependencyJars;
opts = new Options();
opts.addOption(APPNAME_ARG, true, "Application Name. (default '" + APPNAME_DEFAULT + "')");
opts.addOption(QUEUE_ARG, true, "RM Queue in which this application is to be submitted (default '" +
Expand Down Expand Up @@ -480,10 +488,10 @@ public boolean run() throws IOException, YarnException {
// we explicitly add it here
Map<String, LocalResource> localResources = new HashMap<>();
LocalResource scRsrc = LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(DynoConstants.DYNO_JAR.getPath(env)),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
DynoConstants.DYNO_JAR.getLength(env), DynoConstants.DYNO_JAR.getTimestamp(env));
localResources.put(DynoConstants.DYNO_JAR.getResourcePath(), scRsrc);
ConverterUtils.getYarnUrlFromPath(DynoConstants.DYNO_DEPENDENCIES.getPath(env)),
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
DynoConstants.DYNO_DEPENDENCIES.getLength(env), DynoConstants.DYNO_DEPENDENCIES.getTimestamp(env));
localResources.put(DynoConstants.DYNO_DEPENDENCIES.getResourcePath(), scRsrc);
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);

Expand Down Expand Up @@ -550,24 +558,24 @@ private Map<String, String> setupRemoteResourcesGetEnv() throws IOException {
// by containers. We do not need to set them as local resources here as
// the AM does not need them.
if (launchNameNode) {
setupRemoteResource(fsImagePath, infraAppId, DynoConstants.FS_IMAGE, env);
setupRemoteResource(fsImageMD5Path, infraAppId, DynoConstants.FS_IMAGE_MD5, env);
setupRemoteResource(infraAppId, DynoConstants.FS_IMAGE, env, fsImagePath);
setupRemoteResource(infraAppId, DynoConstants.FS_IMAGE_MD5, env, fsImageMD5Path);
} else {
env.put(DynoConstants.REMOTE_NN_RPC_ADDR_ENV, remoteNameNodeRpcAddress);
}
setupRemoteResource(versionFilePath, infraAppId, DynoConstants.VERSION, env);
setupRemoteResource(confPath, infraAppId, DynoConstants.CONF_ZIP, env);
setupRemoteResource(START_SCRIPT_LOCATION, infraAppId, DynoConstants.START_SCRIPT, env);
setupRemoteResource(hadoopBinary, infraAppId, DynoConstants.HADOOP_BINARY, env);
setupRemoteResource(appMasterJar, infraAppId, DynoConstants.DYNO_JAR, env);
setupRemoteResource(infraAppId, DynoConstants.VERSION, env, versionFilePath);
setupRemoteResource(infraAppId, DynoConstants.CONF_ZIP, env, confPath);
setupRemoteResource(infraAppId, DynoConstants.START_SCRIPT, env, START_SCRIPT_LOCATION);
setupRemoteResource(infraAppId, DynoConstants.HADOOP_BINARY, env, hadoopBinary);
setupRemoteResource(infraAppId, DynoConstants.DYNO_DEPENDENCIES, env, dependencyJars);

env.put(DynoConstants.BLOCK_LIST_PATH_ENV, blockListPath);
env.put(DynoConstants.JOB_ACL_VIEW_ENV,
getConf().get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));

env.put(DynoConstants.REMOTE_STORAGE_PATH_ENV, getRemoteStoragePath(getConf(), infraAppId).toString());

env.put("CLASSPATH", getAMClassPathEnv());
env.put(Environment.CLASSPATH.key(), getAMClassPathEnv());
return env;
}

Expand All @@ -579,18 +587,21 @@ private String getAMClassPathEnv() {
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
.append(File.pathSeparatorChar).append("./*");
.append(ApplicationConstants.CLASS_PATH_SEPARATOR)
.append("./")
.append(DynoConstants.DYNO_DEPENDENCIES.getResourcePath())
.append("/*");
for (String c : getConf().getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./log4j.properties");

// add the runtime classpath needed for tests to work
if (getConf().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
classPathEnv.append(':');
classPathEnv.append(System.getProperty("java.class.path"));
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(StandardSystemProperty.JAVA_CLASS_PATH.value());
}
return classPathEnv.toString();
}
Expand Down Expand Up @@ -623,56 +634,85 @@ private List<String> getAMCommand() {
* with the correct resource settings.
* @throws IOException
*/
private void setupRemoteResource(String srcPath, ApplicationId appId,
DynoResource resource, Map<String, String> env) throws IOException {
private void setupRemoteResource(ApplicationId appId, DynoResource resource, Map<String, String> env,
String... srcPaths) throws IOException {

FileStatus remoteFileStatus;
Path dstPath;

URI srcURI;
try {
srcURI = new URI(srcPath);
} catch (URISyntaxException e) {
throw new IOException(e);
}
if (srcURI.getScheme() == null || srcURI.getScheme().equals(FileSystem.getLocal(getConf()).getScheme()) ||
srcURI.getScheme().equals("jar")) {
Preconditions.checkArgument(srcPaths.length > 0, "Must supply at least one source path");
Preconditions.checkArgument(resource.getType() == LocalResourceType.ARCHIVE || srcPaths.length == 1,
"Can only specify multiple source paths if using an ARCHIVE type");

List<URI> srcURIs = Arrays.stream(srcPaths).map(URI::create).collect(Collectors.toList());
Set<String> srcSchemes = srcURIs.stream().map(URI::getScheme).collect(Collectors.toSet());
Preconditions.checkArgument(srcSchemes.size() == 1, "All source paths must have the same scheme");
String srcScheme = srcSchemes.iterator().next();

String srcPathString = "[" + Joiner.on(",").join(srcPaths) + "]";

if (srcScheme == null || srcScheme.equals(FileSystem.getLocal(getConf()).getScheme()) || srcScheme.equals("jar")) {
// Need to upload this resource to remote storage
File srcFile = new File(srcURI.getSchemeSpecificPart());
dstPath = new Path(getRemoteStoragePath(getConf(), appId), srcFile.getName());
if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) {
if ("jar".equals(srcURI.getScheme())) {
List<File> srcFiles = srcURIs.stream()
.map(URI::getSchemeSpecificPart).map(File::new)
.collect(Collectors.toList());
Path dstPathBase = getRemoteStoragePath(getConf(), appId);
boolean shouldArchive = srcFiles.size() > 1 || srcFiles.get(0).isDirectory()
|| (resource.getType() == LocalResourceType.ARCHIVE
&& Arrays.stream(ARCHIVE_FILE_TYPES).noneMatch(suffix -> srcFiles.get(0).getName().endsWith(suffix)));
if (shouldArchive) {
if ("jar".equals(srcScheme)) {
throw new IllegalArgumentException(String.format(
"Resources in JARs can't be auto-zipped; resource %s is ARCHIVE and src is a directory: %s",
resource.getResourcePath(), srcPath));
"Resources in JARs can't be auto-zipped; resource %s is ARCHIVE and src is: %s",
resource.getResourcePath(), srcPathString));
} else if (resource.getType() != LocalResourceType.ARCHIVE) {
throw new IllegalArgumentException(String.format(
"Resource type is %s but srcPaths were: %s", resource.getType(), srcPathString));
}
dstPath = dstPath.suffix(".zip");
dstPath = new Path(dstPathBase, resource.getResourcePath()).suffix(".zip");
} else {
dstPath = new Path(dstPathBase, srcFiles.get(0).getName());
}
FileSystem remoteFS = dstPath.getFileSystem(getConf());
LOG.info("Uploading resource " + resource + " from " + srcPath + " to " + dstPath);
LOG.info("Uploading resource " + resource + " from " + srcPathString + " to " + dstPath);
try (OutputStream outputStream = remoteFS.create(dstPath, true)) {
if ("jar".equals(srcURI.getScheme())) {
try (InputStream inputStream = new URL(srcPath).openStream()) {
if ("jar".equals(srcScheme)) {
try (InputStream inputStream = new URL(srcPaths[0]).openStream()) {
IOUtils.copyBytes(inputStream, outputStream, getConf());
}
} else if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) {
} else if (shouldArchive) {
List<File> filesToZip;
if (srcFiles.size() == 1 && srcFiles.get(0).isDirectory()) {
File[] childFiles = srcFiles.get(0).listFiles();
if (childFiles == null || childFiles.length == 0) {
throw new IllegalArgumentException("Specified a directory to archive with no contents");
}
filesToZip = Lists.newArrayList(childFiles);
} else {
filesToZip = srcFiles;
}
ZipOutputStream zout = new ZipOutputStream(outputStream);
addFileToZipRecursively(srcFile, srcFile, zout);
for (File fileToZip : filesToZip) {
addFileToZipRecursively(fileToZip.getParentFile(), fileToZip, zout);
}
zout.close();
} else {
try (InputStream inputStream = new FileInputStream(srcFile)){
try (InputStream inputStream = new FileInputStream(srcFiles.get(0))) {
IOUtils.copyBytes(inputStream, outputStream, getConf());
}
}
}
remoteFileStatus = remoteFS.getFileStatus(dstPath);
} else {
LOG.info("Using resource " + resource + " directly from current location: " + srcPath);
dstPath = new Path(srcPath);
if (srcPaths.length > 1) {
throw new IllegalArgumentException("If resource is on remote, must be a single file: " + srcPathString);
}
LOG.info("Using resource " + resource + " directly from current location: " + srcPaths[0]);
dstPath = new Path(srcPaths[0]);
// non-local file system; we can just use it directly from where it is
remoteFileStatus = FileSystem.get(dstPath.toUri(), getConf()).getFileStatus(dstPath);
if (remoteFileStatus.isDirectory()) {
throw new IllegalArgumentException("If resource is on remote filesystem, must be a file: " + srcPath);
throw new IllegalArgumentException("If resource is on remote filesystem, must be a file: " + srcPaths[0]);
}
}
env.put(resource.getLocationEnvVar(), dstPath.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public class DynoConstants {
public static final DynoResource FS_IMAGE_MD5 = new DynoResource("FS_IMAGE_MD5", FILE, null);
// Resource for the VERSION file accompanying the file system image
public static final DynoResource VERSION = new DynoResource("VERSION", FILE, "VERSION");
// Resource for the JAR file containing all of the Dynamometer Java code
public static final DynoResource DYNO_JAR = new DynoResource("DYNO_JAR", FILE, "dynamometer.jar");
// Resource for the archive containing all dependencies
public static final DynoResource DYNO_DEPENDENCIES = new DynoResource("DYNO_DEPS", ARCHIVE, "dependencies");

// Environment variable which will contain the location of the directory
// which holds all of the block files for the DataNodes
Expand Down
10 changes: 3 additions & 7 deletions dynamometer-infra/src/main/resources/start-component.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export HADOOP_CONF_DIR=${confDir}
export YARN_CONF_DIR=${confDir}
export HADOOP_LOG_DIR=${logDir}
export HADOOP_PID_DIR=${pidDir}
export HADOOP_CLASSPATH="$extraClasspathDir"
export HADOOP_CLASSPATH="`pwd`/dependencies/*:$extraClasspathDir"
echo "Environment variables are set as:"
echo "(note that this doesn't include changes made by hadoop-env.sh)"
printenv
Expand Down Expand Up @@ -179,10 +179,9 @@ if [ "$component" = "datanode" ]; then
EOF

echo "Executing the following:"
printf "${HADOOP_HOME}/bin/hadoop jar dynamometer.jar com.linkedin.dynamometer.SimulatedDataNodes "
printf "${HADOOP_HOME}/bin/hadoop com.linkedin.dynamometer.SimulatedDataNodes "
printf "$DN_ADDITIONAL_ARGS $datanodeClusterConfigs\n"
${HADOOP_HOME}/bin/hadoop jar dynamometer.jar com.linkedin.dynamometer.SimulatedDataNodes \
$DN_ADDITIONAL_ARGS $datanodeClusterConfigs &
${HADOOP_HOME}/bin/hadoop com.linkedin.dynamometer.SimulatedDataNodes $DN_ADDITIONAL_ARGS $datanodeClusterConfigs &
launchSuccess="$?"
componentPID="$!"
if [[ ${launchSuccess} -ne 0 ]]; then
Expand Down Expand Up @@ -246,9 +245,6 @@ EOF
ln -snf "`pwd`/VERSION" "$nameDir/current/VERSION"
chmod 700 "$nameDir/current/"*

# To be able to use the custom block placement policy and the AllowAllImpersonationProvider
export HADOOP_CLASSPATH="`pwd`/dynamometer.jar:$HADOOP_CLASSPATH"

read -r -d '' namenodeConfigs <<EOF
-D fs.defaultFS=hdfs://${nnHostname}:${nnRpcPort}
-D dfs.namenode.rpc-address=${nnHostname}:${nnRpcPort}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down