diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index dd3f7a4489..0f0dbea9bf 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -22,6 +22,8 @@ import java.net.URISyntaxException; import java.net.URL; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -135,6 +137,7 @@ public class Client extends Configured implements Tool { public static final String MASTER_VCORES_DEFAULT = "1"; public static final String MASTER_MEMORY_MB_ARG = "master_memory_mb"; public static final String MASTER_MEMORY_MB_DEFAULT = "2048"; + public static final String TOKEN_FILE_LOCATION_ARG = "token_file_location"; public static final String WORKLOAD_REPLAY_ENABLE_ARG = "workload_replay_enable"; public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path"; public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper"; @@ -174,6 +177,9 @@ public class Client extends Configured implements Tool { private String remoteNameNodeRpcAddress = ""; // True iff the NameNode should be launched within YARN private boolean launchNameNode; + // The path to the file which contains the delegation tokens to be used for the launched + // containers (may be null) + private String tokenFileLocation; // Holds all of the options which are passed to the AM private AMOptions amOptions; @@ -277,6 +283,8 @@ public Client(String appMasterJar) { opts.addOptionGroup(hadoopBinaryGroup); opts.addOption(NAMENODE_SERVICERPC_ADDR_ARG, true, "Specify this option to run the NameNode " + "external to YARN. This is the service RPC address of the NameNode, e.g. localhost:9020."); + opts.addOption(TOKEN_FILE_LOCATION_ARG, true, "If specified, this file will be used as the delegation token(s) " + + "for the launched containers. Otherwise, the delegation token(s) for the default FileSystem will be used."); AMOptions.setOptions(opts); opts.addOption(WORKLOAD_REPLAY_ENABLE_ARG, false, "If specified, this client will additionally launch the workload " @@ -375,6 +383,7 @@ public boolean accept(Path path) { } this.amOptions = AMOptions.initFromParser(cliParser); this.clientTimeout = Integer.parseInt(cliParser.getOptionValue(TIMEOUT_ARG, TIMEOUT_DEFAULT)); + this.tokenFileLocation = cliParser.getOptionValue(TOKEN_FILE_LOCATION_ARG); amOptions.verify(); @@ -488,22 +497,27 @@ public boolean run() throws IOException, YarnException { // Setup security tokens if (UserGroupInformation.isSecurityEnabled()) { - Credentials credentials = new Credentials(); - String tokenRenewer = getConf().get(YarnConfiguration.RM_PRINCIPAL); - if (tokenRenewer == null || tokenRenewer.length() == 0) { - throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer"); - } + ByteBuffer fsTokens; + if (tokenFileLocation != null) { + fsTokens = ByteBuffer.wrap(Files.readAllBytes(Paths.get(tokenFileLocation))); + } else { + Credentials credentials = new Credentials(); + String tokenRenewer = getConf().get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer"); + } - // For now, only getting tokens for the default file-system. - final Token tokens[] = fs.addDelegationTokens(tokenRenewer, credentials); - if (tokens != null) { - for (Token token : tokens) { - LOG.info("Got dt for " + fs.getUri() + "; " + token); + // For now, only getting tokens for the default file-system. + final Token[] tokens = fs.addDelegationTokens(tokenRenewer, credentials); + if (tokens != null) { + for (Token token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; " + token); + } } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); } - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); amContainer.setTokens(fsTokens); } @@ -884,7 +898,7 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { /** * Best-effort attempt to clean up any remaining applications (infrastructure or workload). */ - private void attemptCleanup() { + public void attemptCleanup() { LOG.info("Attempting to clean up remaining running applications."); if (workloadJob != null) { try { diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/azkaban/DynamometerAzkabanRunner.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/azkaban/DynamometerAzkabanRunner.java new file mode 100644 index 0000000000..e46d7e6d7a --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/azkaban/DynamometerAzkabanRunner.java @@ -0,0 +1,103 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.azkaban; + +import com.linkedin.dynamometer.ApplicationMaster; +import com.linkedin.dynamometer.Client; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.ClassUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + + +/** + * A class used to be able to run Dynamometer via Azkaban. It is expected + * to be run as a HadoopJavaJob type. Any Azkaban property prefixed with + * `{@code dyno.}` will be used as an argument to the Dynamometer infrastructure + * client. For example, setting the property `{@code dyno.foo}` to `{@code bar}` + * would result in the client receiving the arguments `{@code -foo bar}`. + * For arguments which do not expect an argument, specify them as + * `{@code dyno.flag.foo}`; setting a value of true will include the flag + * `{@code -foo}` and setting a value of false will do nothing. + */ +public class DynamometerAzkabanRunner extends Configured { + + public static final String DYNO_PROPERTY_PREFIX = "dyno."; + public static final String DYNO_FLAG_PREFIX = "flag."; + + private static final Log LOG = LogFactory.getLog(DynamometerAzkabanRunner.class); + private final String name; + private final Properties properties; + + private Client dynoClient; + + /** + * The constructor expected by Azkaban. + * @param name The name of the application. + * @param properties The properties to be used by this application. + */ + public DynamometerAzkabanRunner(String name, Properties properties) { + super(new YarnConfiguration()); + this.name = name; + this.properties = properties; + } + + /** + * Expected by Azkaban; this is the main entrypoint. + */ + public void run() throws Exception { + dynoClient = new Client(ClassUtil.findContainingJar(ApplicationMaster.class)); + dynoClient.setConf(getConf()); + + List argList = new ArrayList<>(); + argList.add("-" + Client.APPNAME_ARG); + argList.add(name); + String tokenFileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); + if (tokenFileLocation != null) { + argList.add("-" + Client.TOKEN_FILE_LOCATION_ARG); + argList.add(tokenFileLocation); + } + for (Map.Entry property : properties.entrySet()) { + String fullKey = (String) property.getKey(); + if (!fullKey.startsWith(DYNO_PROPERTY_PREFIX)) { + continue; + } + String key = fullKey.substring(DYNO_PROPERTY_PREFIX.length()); + if (key.startsWith(DYNO_FLAG_PREFIX)) { + String flag = key.substring(DYNO_FLAG_PREFIX.length()); + if (Boolean.valueOf((String) property.getValue())) { + argList.add("-" + flag); + } + continue; + } + argList.add("-" + key); + argList.add((String) property.getValue()); + } + + int returnCode = dynoClient.run(argList.toArray(new String[0])); + if (returnCode != 0) { + String message = "Dynamometer failed with return code: " + returnCode; + LOG.error(message); + throw new RuntimeException(message); + } + } + + /** + * Expected by Azkaban to provide a way to kill the job. + */ + public void cancel() throws Exception { + if (dynoClient == null) { + LOG.info("No Dynamometer client was found; exiting without any action."); + } else { + dynoClient.attemptCleanup(); + } + } + +}