diff --git a/build.gradle b/build.gradle index bcf7539816..17e93bafd3 100644 --- a/build.gradle +++ b/build.gradle @@ -51,7 +51,7 @@ subprojects { apply plugin: 'idea' apply plugin: 'license' - sourceCompatibility = 1.7 + sourceCompatibility = 1.8 license { header rootProject.file('license_header') diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java index 13bee9a850..700f95cfd2 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java @@ -689,7 +689,7 @@ private Map getLocalResources() throws IOException { addAsLocalResourceFromEnv(DynoConstants.START_SCRIPT, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.HADOOP_BINARY, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.VERSION, localResources, envs); - addAsLocalResourceFromEnv(DynoConstants.DYNO_JAR, localResources, envs); + addAsLocalResourceFromEnv(DynoConstants.DYNO_DEPENDENCIES, localResources, envs); if (isNameNodeLauncher) { addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE_MD5, localResources, envs); diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 5a02c7580b..f41280f0ba 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -7,7 +7,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.StandardSystemProperty; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; @@ -19,18 +21,20 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.apache.commons.cli.CommandLine; @@ -51,7 +55,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Job; @@ -85,6 +88,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; + /** * Client for submitting a Dynamometer YARN application, and optionally, a workload MapReduce job. * This client uploads resources to HDFS as necessary for them to be accessed by the @@ -147,6 +151,8 @@ public class Client extends Configured implements Tool { public static final String WORKLOAD_RATE_FACTOR_DEFAULT = "1.0"; public static final String WORKLOAD_CONFIG_ARG = "workload_config"; + private static final String[] ARCHIVE_FILE_TYPES = { ".zip", ".tar", ".tgz", ".tar.gz" }; + private static final String START_SCRIPT_LOCATION = Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString(); @@ -160,8 +166,8 @@ public class Client extends Configured implements Tool { // Amt. of virtual core resource to request for to run the App Master private int amVCores = 1; - // Application master jar file - private String appMasterJar = ""; + // Dependency JARs. Should include, at minimum, the JAR for the App Master + private final String[] dependencyJars; private String hadoopBinary = ""; // Location of DN conf zip @@ -248,8 +254,10 @@ public int run(String[] args) { return 2; } - public Client(String appMasterJar) { - this.appMasterJar = appMasterJar; + public Client(String... dependencyJars) { + Preconditions.checkArgument(dependencyJars != null && dependencyJars.length > 0, + "Must specify at least one dependency JAR for the ApplicationMaster"); + this.dependencyJars = dependencyJars; opts = new Options(); opts.addOption(APPNAME_ARG, true, "Application Name. (default '" + APPNAME_DEFAULT + "')"); opts.addOption(QUEUE_ARG, true, "RM Queue in which this application is to be submitted (default '" + @@ -480,10 +488,10 @@ public boolean run() throws IOException, YarnException { // we explicitly add it here Map localResources = new HashMap<>(); LocalResource scRsrc = LocalResource.newInstance( - ConverterUtils.getYarnUrlFromPath(DynoConstants.DYNO_JAR.getPath(env)), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, - DynoConstants.DYNO_JAR.getLength(env), DynoConstants.DYNO_JAR.getTimestamp(env)); - localResources.put(DynoConstants.DYNO_JAR.getResourcePath(), scRsrc); + ConverterUtils.getYarnUrlFromPath(DynoConstants.DYNO_DEPENDENCIES.getPath(env)), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + DynoConstants.DYNO_DEPENDENCIES.getLength(env), DynoConstants.DYNO_DEPENDENCIES.getTimestamp(env)); + localResources.put(DynoConstants.DYNO_DEPENDENCIES.getResourcePath(), scRsrc); // Set local resource info into app master container launch context amContainer.setLocalResources(localResources); @@ -550,16 +558,16 @@ private Map setupRemoteResourcesGetEnv() throws IOException { // by containers. We do not need to set them as local resources here as // the AM does not need them. if (launchNameNode) { - setupRemoteResource(fsImagePath, infraAppId, DynoConstants.FS_IMAGE, env); - setupRemoteResource(fsImageMD5Path, infraAppId, DynoConstants.FS_IMAGE_MD5, env); + setupRemoteResource(infraAppId, DynoConstants.FS_IMAGE, env, fsImagePath); + setupRemoteResource(infraAppId, DynoConstants.FS_IMAGE_MD5, env, fsImageMD5Path); } else { env.put(DynoConstants.REMOTE_NN_RPC_ADDR_ENV, remoteNameNodeRpcAddress); } - setupRemoteResource(versionFilePath, infraAppId, DynoConstants.VERSION, env); - setupRemoteResource(confPath, infraAppId, DynoConstants.CONF_ZIP, env); - setupRemoteResource(START_SCRIPT_LOCATION, infraAppId, DynoConstants.START_SCRIPT, env); - setupRemoteResource(hadoopBinary, infraAppId, DynoConstants.HADOOP_BINARY, env); - setupRemoteResource(appMasterJar, infraAppId, DynoConstants.DYNO_JAR, env); + setupRemoteResource(infraAppId, DynoConstants.VERSION, env, versionFilePath); + setupRemoteResource(infraAppId, DynoConstants.CONF_ZIP, env, confPath); + setupRemoteResource(infraAppId, DynoConstants.START_SCRIPT, env, START_SCRIPT_LOCATION); + setupRemoteResource(infraAppId, DynoConstants.HADOOP_BINARY, env, hadoopBinary); + setupRemoteResource(infraAppId, DynoConstants.DYNO_DEPENDENCIES, env, dependencyJars); env.put(DynoConstants.BLOCK_LIST_PATH_ENV, blockListPath); env.put(DynoConstants.JOB_ACL_VIEW_ENV, @@ -567,7 +575,7 @@ private Map setupRemoteResourcesGetEnv() throws IOException { env.put(DynoConstants.REMOTE_STORAGE_PATH_ENV, getRemoteStoragePath(getConf(), infraAppId).toString()); - env.put("CLASSPATH", getAMClassPathEnv()); + env.put(Environment.CLASSPATH.key(), getAMClassPathEnv()); return env; } @@ -579,18 +587,21 @@ private String getAMClassPathEnv() { // For now setting all required classpaths including // the classpath to "." for the application jar StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()) - .append(File.pathSeparatorChar).append("./*"); + .append(ApplicationConstants.CLASS_PATH_SEPARATOR) + .append("./") + .append(DynoConstants.DYNO_DEPENDENCIES.getResourcePath()) + .append("/*"); for (String c : getConf().getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); classPathEnv.append(c.trim()); } - classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties"); + classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./log4j.properties"); // add the runtime classpath needed for tests to work if (getConf().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { - classPathEnv.append(':'); - classPathEnv.append(System.getProperty("java.class.path")); + classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); + classPathEnv.append(StandardSystemProperty.JAVA_CLASS_PATH.value()); } return classPathEnv.toString(); } @@ -623,56 +634,85 @@ private List getAMCommand() { * with the correct resource settings. * @throws IOException */ - private void setupRemoteResource(String srcPath, ApplicationId appId, - DynoResource resource, Map env) throws IOException { + private void setupRemoteResource(ApplicationId appId, DynoResource resource, Map env, + String... srcPaths) throws IOException { FileStatus remoteFileStatus; Path dstPath; - URI srcURI; - try { - srcURI = new URI(srcPath); - } catch (URISyntaxException e) { - throw new IOException(e); - } - if (srcURI.getScheme() == null || srcURI.getScheme().equals(FileSystem.getLocal(getConf()).getScheme()) || - srcURI.getScheme().equals("jar")) { + Preconditions.checkArgument(srcPaths.length > 0, "Must supply at least one source path"); + Preconditions.checkArgument(resource.getType() == LocalResourceType.ARCHIVE || srcPaths.length == 1, + "Can only specify multiple source paths if using an ARCHIVE type"); + + List srcURIs = Arrays.stream(srcPaths).map(URI::create).collect(Collectors.toList()); + Set srcSchemes = srcURIs.stream().map(URI::getScheme).collect(Collectors.toSet()); + Preconditions.checkArgument(srcSchemes.size() == 1, "All source paths must have the same scheme"); + String srcScheme = srcSchemes.iterator().next(); + + String srcPathString = "[" + Joiner.on(",").join(srcPaths) + "]"; + + if (srcScheme == null || srcScheme.equals(FileSystem.getLocal(getConf()).getScheme()) || srcScheme.equals("jar")) { // Need to upload this resource to remote storage - File srcFile = new File(srcURI.getSchemeSpecificPart()); - dstPath = new Path(getRemoteStoragePath(getConf(), appId), srcFile.getName()); - if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) { - if ("jar".equals(srcURI.getScheme())) { + List srcFiles = srcURIs.stream() + .map(URI::getSchemeSpecificPart).map(File::new) + .collect(Collectors.toList()); + Path dstPathBase = getRemoteStoragePath(getConf(), appId); + boolean shouldArchive = srcFiles.size() > 1 || srcFiles.get(0).isDirectory() + || (resource.getType() == LocalResourceType.ARCHIVE + && Arrays.stream(ARCHIVE_FILE_TYPES).noneMatch(suffix -> srcFiles.get(0).getName().endsWith(suffix))); + if (shouldArchive) { + if ("jar".equals(srcScheme)) { throw new IllegalArgumentException(String.format( - "Resources in JARs can't be auto-zipped; resource %s is ARCHIVE and src is a directory: %s", - resource.getResourcePath(), srcPath)); + "Resources in JARs can't be auto-zipped; resource %s is ARCHIVE and src is: %s", + resource.getResourcePath(), srcPathString)); + } else if (resource.getType() != LocalResourceType.ARCHIVE) { + throw new IllegalArgumentException(String.format( + "Resource type is %s but srcPaths were: %s", resource.getType(), srcPathString)); } - dstPath = dstPath.suffix(".zip"); + dstPath = new Path(dstPathBase, resource.getResourcePath()).suffix(".zip"); + } else { + dstPath = new Path(dstPathBase, srcFiles.get(0).getName()); } FileSystem remoteFS = dstPath.getFileSystem(getConf()); - LOG.info("Uploading resource " + resource + " from " + srcPath + " to " + dstPath); + LOG.info("Uploading resource " + resource + " from " + srcPathString + " to " + dstPath); try (OutputStream outputStream = remoteFS.create(dstPath, true)) { - if ("jar".equals(srcURI.getScheme())) { - try (InputStream inputStream = new URL(srcPath).openStream()) { + if ("jar".equals(srcScheme)) { + try (InputStream inputStream = new URL(srcPaths[0]).openStream()) { IOUtils.copyBytes(inputStream, outputStream, getConf()); } - } else if (resource.getType() == LocalResourceType.ARCHIVE && srcFile.isDirectory()) { + } else if (shouldArchive) { + List filesToZip; + if (srcFiles.size() == 1 && srcFiles.get(0).isDirectory()) { + File[] childFiles = srcFiles.get(0).listFiles(); + if (childFiles == null || childFiles.length == 0) { + throw new IllegalArgumentException("Specified a directory to archive with no contents"); + } + filesToZip = Lists.newArrayList(childFiles); + } else { + filesToZip = srcFiles; + } ZipOutputStream zout = new ZipOutputStream(outputStream); - addFileToZipRecursively(srcFile, srcFile, zout); + for (File fileToZip : filesToZip) { + addFileToZipRecursively(fileToZip.getParentFile(), fileToZip, zout); + } zout.close(); } else { - try (InputStream inputStream = new FileInputStream(srcFile)){ + try (InputStream inputStream = new FileInputStream(srcFiles.get(0))) { IOUtils.copyBytes(inputStream, outputStream, getConf()); } } } remoteFileStatus = remoteFS.getFileStatus(dstPath); } else { - LOG.info("Using resource " + resource + " directly from current location: " + srcPath); - dstPath = new Path(srcPath); + if (srcPaths.length > 1) { + throw new IllegalArgumentException("If resource is on remote, must be a single file: " + srcPathString); + } + LOG.info("Using resource " + resource + " directly from current location: " + srcPaths[0]); + dstPath = new Path(srcPaths[0]); // non-local file system; we can just use it directly from where it is remoteFileStatus = FileSystem.get(dstPath.toUri(), getConf()).getFileStatus(dstPath); if (remoteFileStatus.isDirectory()) { - throw new IllegalArgumentException("If resource is on remote filesystem, must be a file: " + srcPath); + throw new IllegalArgumentException("If resource is on remote filesystem, must be a file: " + srcPaths[0]); } } env.put(resource.getLocationEnvVar(), dstPath.toString()); diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java index d6b7f0da6d..021716b4b8 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java @@ -39,8 +39,8 @@ public class DynoConstants { public static final DynoResource FS_IMAGE_MD5 = new DynoResource("FS_IMAGE_MD5", FILE, null); // Resource for the VERSION file accompanying the file system image public static final DynoResource VERSION = new DynoResource("VERSION", FILE, "VERSION"); - // Resource for the JAR file containing all of the Dynamometer Java code - public static final DynoResource DYNO_JAR = new DynoResource("DYNO_JAR", FILE, "dynamometer.jar"); + // Resource for the archive containing all dependencies + public static final DynoResource DYNO_DEPENDENCIES = new DynoResource("DYNO_DEPS", ARCHIVE, "dependencies"); // Environment variable which will contain the location of the directory // which holds all of the block files for the DataNodes diff --git a/dynamometer-infra/src/main/resources/start-component.sh b/dynamometer-infra/src/main/resources/start-component.sh index b5de763da6..62c292d19e 100755 --- a/dynamometer-infra/src/main/resources/start-component.sh +++ b/dynamometer-infra/src/main/resources/start-component.sh @@ -96,7 +96,7 @@ export HADOOP_CONF_DIR=${confDir} export YARN_CONF_DIR=${confDir} export HADOOP_LOG_DIR=${logDir} export HADOOP_PID_DIR=${pidDir} -export HADOOP_CLASSPATH="$extraClasspathDir" +export HADOOP_CLASSPATH="`pwd`/dependencies/*:$extraClasspathDir" echo "Environment variables are set as:" echo "(note that this doesn't include changes made by hadoop-env.sh)" printenv @@ -179,10 +179,9 @@ if [ "$component" = "datanode" ]; then EOF echo "Executing the following:" - printf "${HADOOP_HOME}/bin/hadoop jar dynamometer.jar com.linkedin.dynamometer.SimulatedDataNodes " + printf "${HADOOP_HOME}/bin/hadoop com.linkedin.dynamometer.SimulatedDataNodes " printf "$DN_ADDITIONAL_ARGS $datanodeClusterConfigs\n" - ${HADOOP_HOME}/bin/hadoop jar dynamometer.jar com.linkedin.dynamometer.SimulatedDataNodes \ - $DN_ADDITIONAL_ARGS $datanodeClusterConfigs & + ${HADOOP_HOME}/bin/hadoop com.linkedin.dynamometer.SimulatedDataNodes $DN_ADDITIONAL_ARGS $datanodeClusterConfigs & launchSuccess="$?" componentPID="$!" if [[ ${launchSuccess} -ne 0 ]]; then @@ -246,9 +245,6 @@ EOF ln -snf "`pwd`/VERSION" "$nameDir/current/VERSION" chmod 700 "$nameDir/current/"* - # To be able to use the custom block placement policy and the AllowAllImpersonationProvider - export HADOOP_CLASSPATH="`pwd`/dynamometer.jar:$HADOOP_CLASSPATH" - read -r -d '' namenodeConfigs <