Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -135,6 +137,7 @@ public class Client extends Configured implements Tool {
public static final String MASTER_VCORES_DEFAULT = "1";
public static final String MASTER_MEMORY_MB_ARG = "master_memory_mb";
public static final String MASTER_MEMORY_MB_DEFAULT = "2048";
public static final String TOKEN_FILE_LOCATION_ARG = "token_file_location";
public static final String WORKLOAD_REPLAY_ENABLE_ARG = "workload_replay_enable";
public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path";
public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper";
Expand Down Expand Up @@ -174,6 +177,9 @@ public class Client extends Configured implements Tool {
private String remoteNameNodeRpcAddress = "";
// True iff the NameNode should be launched within YARN
private boolean launchNameNode;
// The path to the file which contains the delegation tokens to be used for the launched
// containers (may be null)
private String tokenFileLocation;

// Holds all of the options which are passed to the AM
private AMOptions amOptions;
Expand Down Expand Up @@ -277,6 +283,8 @@ public Client(String appMasterJar) {
opts.addOptionGroup(hadoopBinaryGroup);
opts.addOption(NAMENODE_SERVICERPC_ADDR_ARG, true, "Specify this option to run the NameNode " +
"external to YARN. This is the service RPC address of the NameNode, e.g. localhost:9020.");
opts.addOption(TOKEN_FILE_LOCATION_ARG, true, "If specified, this file will be used as the delegation token(s) " +
"for the launched containers. Otherwise, the delegation token(s) for the default FileSystem will be used.");
AMOptions.setOptions(opts);

opts.addOption(WORKLOAD_REPLAY_ENABLE_ARG, false, "If specified, this client will additionally launch the workload "
Expand Down Expand Up @@ -375,6 +383,7 @@ public boolean accept(Path path) {
}
this.amOptions = AMOptions.initFromParser(cliParser);
this.clientTimeout = Integer.parseInt(cliParser.getOptionValue(TIMEOUT_ARG, TIMEOUT_DEFAULT));
this.tokenFileLocation = cliParser.getOptionValue(TOKEN_FILE_LOCATION_ARG);

amOptions.verify();

Expand Down Expand Up @@ -488,22 +497,27 @@ public boolean run() throws IOException, YarnException {

// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = new Credentials();
String tokenRenewer = getConf().get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
}
ByteBuffer fsTokens;
if (tokenFileLocation != null) {
fsTokens = ByteBuffer.wrap(Files.readAllBytes(Paths.get(tokenFileLocation)));
} else {
Credentials credentials = new Credentials();
String tokenRenewer = getConf().get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
}

// For now, only getting tokens for the default file-system.
final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials);
if (tokens != null) {
for (Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
// For now, only getting tokens for the default file-system.
final Token<?>[] tokens = fs.addDelegationTokens(tokenRenewer, credentials);
if (tokens != null) {
for (Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
}
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(fsTokens);
}

Expand Down Expand Up @@ -884,7 +898,7 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) {
/**
* Best-effort attempt to clean up any remaining applications (infrastructure or workload).
*/
private void attemptCleanup() {
public void attemptCleanup() {
LOG.info("Attempting to clean up remaining running applications.");
if (workloadJob != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer.azkaban;

import com.linkedin.dynamometer.ApplicationMaster;
import com.linkedin.dynamometer.Client;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;


/**
* A class used to be able to run Dynamometer via Azkaban. It is expected
* to be run as a HadoopJavaJob type. Any Azkaban property prefixed with
* `{@code dyno.}` will be used as an argument to the Dynamometer infrastructure
* client. For example, setting the property `{@code dyno.foo}` to `{@code bar}`
* would result in the client receiving the arguments `{@code -foo bar}`.
* For arguments which do not expect an argument, specify them as
* `{@code dyno.flag.foo}`; setting a value of true will include the flag
* `{@code -foo}` and setting a value of false will do nothing.
*/
public class DynamometerAzkabanRunner extends Configured {

public static final String DYNO_PROPERTY_PREFIX = "dyno.";
public static final String DYNO_FLAG_PREFIX = "flag.";

private static final Log LOG = LogFactory.getLog(DynamometerAzkabanRunner.class);
private final String name;
private final Properties properties;

private Client dynoClient;

/**
* The constructor expected by Azkaban.
* @param name The name of the application.
* @param properties The properties to be used by this application.
*/
public DynamometerAzkabanRunner(String name, Properties properties) {
super(new YarnConfiguration());
this.name = name;
this.properties = properties;
}

/**
* Expected by Azkaban; this is the main entrypoint.
*/
public void run() throws Exception {
dynoClient = new Client(ClassUtil.findContainingJar(ApplicationMaster.class));
dynoClient.setConf(getConf());

List<String> argList = new ArrayList<>();
argList.add("-" + Client.APPNAME_ARG);
argList.add(name);
String tokenFileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
if (tokenFileLocation != null) {
argList.add("-" + Client.TOKEN_FILE_LOCATION_ARG);
argList.add(tokenFileLocation);
}
for (Map.Entry<Object, Object> property : properties.entrySet()) {
String fullKey = (String) property.getKey();
if (!fullKey.startsWith(DYNO_PROPERTY_PREFIX)) {
continue;
}
String key = fullKey.substring(DYNO_PROPERTY_PREFIX.length());
if (key.startsWith(DYNO_FLAG_PREFIX)) {
String flag = key.substring(DYNO_FLAG_PREFIX.length());
if (Boolean.valueOf((String) property.getValue())) {
argList.add("-" + flag);
}
continue;
}
argList.add("-" + key);
argList.add((String) property.getValue());
}

int returnCode = dynoClient.run(argList.toArray(new String[0]));
if (returnCode != 0) {
String message = "Dynamometer failed with return code: " + returnCode;
LOG.error(message);
throw new RuntimeException(message);
}
}

/**
* Expected by Azkaban to provide a way to kill the job.
*/
public void cancel() throws Exception {
if (dynoClient == null) {
LOG.info("No Dynamometer client was found; exiting without any action.");
} else {
dynoClient.attemptCleanup();
}
}

}