Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ scripts.tar
*.iml
.idea/
gradle.out
*/out
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public class Client extends Configured implements Tool {
public static final String TOKEN_FILE_LOCATION_ARG = "token_file_location";
public static final String WORKLOAD_REPLAY_ENABLE_ARG = "workload_replay_enable";
public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path";
public static final String WORKLOAD_OUTPUT_PATH_ARG = "workload_output_path";
public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper";
public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay";
public static final String WORKLOAD_RATE_FACTOR_ARG = "workload_rate_factor";
Expand Down Expand Up @@ -205,6 +206,8 @@ public class Client extends Configured implements Tool {
private volatile Job workloadJob;
// The input path for the workload job.
private String workloadInputPath = "";
// The output path for the workload job metric results.
private String workloadOutputPath = "";
// The number of threads to use per mapper for the workload job.
private int workloadThreadsPerMapper;
// The startup delay for the workload job.
Expand Down Expand Up @@ -299,6 +302,7 @@ public Client(String... dependencyJars) {
opts.addOption(WORKLOAD_REPLAY_ENABLE_ARG, false, "If specified, this client will additionally launch the workload "
+ "replay job to replay audit logs against the HDFS cluster which is started.");
opts.addOption(WORKLOAD_INPUT_PATH_ARG, true, "Location of the audit traces to replay (Required for workload)");
opts.addOption(WORKLOAD_OUTPUT_PATH_ARG, true, "Location of the metrics output (Required for workload)");
opts.addOption(WORKLOAD_THREADS_PER_MAPPER_ARG, true, "Number of threads per mapper to use to " +
"replay the workload. (default " + AuditReplayMapper.NUM_THREADS_DEFAULT + ")");
opts.addOption(WORKLOAD_START_DELAY_ARG, true, "Delay between launching the Workload MR job and " +
Expand Down Expand Up @@ -409,6 +413,7 @@ public boolean accept(Path path) {
}
launchWorkloadJob = true;
workloadInputPath = cliParser.getOptionValue(WORKLOAD_INPUT_PATH_ARG);
workloadOutputPath = cliParser.getOptionValue(WORKLOAD_OUTPUT_PATH_ARG);
workloadThreadsPerMapper = Integer.parseInt(cliParser.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG,
String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT)));
workloadRateFactor = Double.parseDouble(cliParser.getOptionValue(WORKLOAD_RATE_FACTOR_ARG,
Expand Down Expand Up @@ -912,6 +917,7 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) {
long workloadStartTime = System.currentTimeMillis() + workloadStartDelayMs;
Configuration workloadConf = new Configuration(getConf());
workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
workloadConf.set(AuditReplayMapper.OUTPUT_PATH_KEY, workloadOutputPath);
workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper);
workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor);
for (Map.Entry<String, String> configPair : workloadExtraConfigs.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class TestDynamometerInfra {
private static final String NAMENODE_NODELABEL = "dyno_namenode";
private static final String DATANODE_NODELABEL = "dyno_datanode";

private static final String OUTPUT_PATH = "/tmp/trace_output_direct";

private static MiniDFSCluster miniDFSCluster;
private static MiniYARNCluster miniYARNCluster;
private static YarnClient yarnClient;
Expand Down Expand Up @@ -278,6 +280,7 @@ public void run() {
"-" + AMOptions.SHELL_ENV_ARG, "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop",
"-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
"-" + Client.WORKLOAD_INPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
"-" + Client.WORKLOAD_OUTPUT_PATH_ARG, fs.makeQualified(new Path(OUTPUT_PATH)).toString(),
"-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
"-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
"-" + AMOptions.NAMENODE_ARGS_ARG, "-Ddfs.namenode.safemode.extension=0"
Expand Down Expand Up @@ -402,6 +405,8 @@ public Boolean get() {
}
}
}, 3000, 60000);

assertTrue(fs.exists(new Path(OUTPUT_PATH)));
}

private static URI getResourcePath(String resourceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* in which to create files.</li>
* </ul>
*/
public class CreateFileMapper extends WorkloadMapper<LongWritable, NullWritable> {
public class CreateFileMapper extends WorkloadMapper<LongWritable, NullWritable, NullWritable, NullWritable> {

public static final String FILE_PARENT_PATH_KEY = "createfile.file-parent-path";
public static final String FILE_PARENT_PATH_DEFAULT = "/tmp/createFileMapper";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
Expand Down Expand Up @@ -120,16 +118,9 @@ public static Job getJobForSubmission(Configuration baseConf, String nnURI, long
conf.setLong(START_TIMESTAMP_MS, startTimestampMs);

Job job = Job.getInstance(conf, "Dynamometer Workload Driver");
job.setOutputFormatClass(NullOutputFormat.class);
job.setJarByClass(mapperClass);
job.setMapperClass(mapperClass);
job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf));
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
mapperClass.newInstance().configureJob(job);

return job;
}
Expand All @@ -153,7 +144,7 @@ private Class<? extends WorkloadMapper> getMapperClass(String className) throws

private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundException,
InstantiationException, IllegalAccessException {
WorkloadMapper<?, ?> mapper = getMapperClass(mapperClassName).newInstance();
WorkloadMapper<?, ?, ?, ?> mapper = getMapperClass(mapperClassName).newInstance();
StringBuilder builder = new StringBuilder("Usage for ");
builder.append(mapper.getClass().getSimpleName());
builder.append(":\n");
Expand All @@ -167,5 +158,4 @@ private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundEx

return builder.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;


/**
* Represents the base class for a generic workload-generating mapper. By default, it will expect to use
* {@link TimedInputFormat} as its {@link InputFormat}. Subclasses expecting a different {@link InputFormat}
* should override the {@link #getInputFormat(Configuration)} method.
* {@link TimedInputFormat} as its {@link InputFormat}. Subclasses requiring a reducer or expecting
* a different {@link InputFormat} should override the {@link #configureJob(Job)} method.
*/
public abstract class WorkloadMapper<KEYIN, VALUEIN> extends Mapper<KEYIN, VALUEIN, NullWritable, NullWritable> {

/**
* Return the input class to be used by this mapper.
*/
public Class<? extends InputFormat> getInputFormat(Configuration conf) {
return TimedInputFormat.class;
}

public abstract class WorkloadMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* Get the description of the behavior of this mapper.
*/
Expand All @@ -41,4 +35,15 @@ public Class<? extends InputFormat> getInputFormat(Configuration conf) {
*/
public abstract boolean verifyConfigurations(Configuration conf);

/**
* Setup input and output formats and optional reducer.
*/
public void configureJob(Job job) {
job.setInputFormatClass(TimedInputFormat.class);

job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.READ;
import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.WRITE;
Expand Down Expand Up @@ -55,9 +58,10 @@
* replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate
* factor of 0.5 would make it occur half as fast.
*/
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text, UserCommandKey, LongWritable> {

public static final String INPUT_PATH_KEY = "auditreplay.input-path";
public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
public static final String NUM_THREADS_KEY = "auditreplay.num-threads";
public static final int NUM_THREADS_DEFAULT = 1;
public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
Expand Down Expand Up @@ -141,11 +145,6 @@ public enum CommandType {
private AuditCommandParser commandParser;
private ScheduledThreadPoolExecutor progressExecutor;

@Override
public Class<? extends InputFormat> getInputFormat(Configuration conf) {
return NoSplitTextInputFormat.class;
}

@Override
public String getDescription() {
return "This mapper replays audit log files.";
Expand All @@ -155,6 +154,7 @@ public String getDescription() {
public List<String> getConfigDescriptions() {
return Lists.newArrayList(
INPUT_PATH_KEY + " (required): Path to directory containing input files.",
OUTPUT_PATH_KEY + " (required): Path to destination for output files.",
NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.",
CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT + "): Whether or not to create 1-byte blocks when " +
"performing `create` commands.",
Expand All @@ -166,7 +166,7 @@ public List<String> getConfigDescriptions() {

@Override
public boolean verifyConfigurations(Configuration conf) {
return conf.get(INPUT_PATH_KEY) != null;
return conf.get(INPUT_PATH_KEY) != null && conf.get(OUTPUT_PATH_KEY) != null;
}

@Override
Expand Down Expand Up @@ -226,7 +226,7 @@ public void map(LongWritable lineNum, Text inputLine, Mapper.Context context)
}

@Override
public void cleanup(Mapper.Context context) throws InterruptedException {
public void cleanup(Mapper.Context context) throws InterruptedException, IOException {
for (AuditReplayThread t : threads) {
// Add in an indicator for each thread to shut down after the last real command
t.addToQueue(AuditReplayCommand.getPoisonPill(highestTimestamp + 1));
Expand All @@ -235,6 +235,7 @@ public void cleanup(Mapper.Context context) throws InterruptedException {
for (AuditReplayThread t : threads) {
t.join();
t.drainCounters(context);
t.drainCommandLatencies(context);
if (t.getException() != null) {
threadException = Optional.of(t.getException());
}
Expand All @@ -252,4 +253,19 @@ public void cleanup(Mapper.Context context) throws InterruptedException {
LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps);
}
}

@Override
public void configureJob(Job job) {
job.setMapOutputKeyClass(UserCommandKey.class);
job.setMapOutputValueClass(LongWritable.class);
job.setInputFormatClass(NoSplitTextInputFormat.class);

job.setNumReduceTasks(1);
job.setReducerClass(AuditReplayReducer.class);
job.setOutputKeyClass(UserCommandKey.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer.workloadgenerator.audit;
Comment thread
csgregorian marked this conversation as resolved.

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/**
* AuditReplayReducer aggregates the returned latency values from {@link AuditReplayMapper} and sums
* them up by {@link UserCommandKey}, which combines the user's id that ran the command and the type
* of the command (READ/WRITE).
*/
public class AuditReplayReducer extends
Comment thread
xkrogen marked this conversation as resolved.
Reducer<UserCommandKey, LongWritable, UserCommandKey, LongWritable> {

@Override
protected void reduce(UserCommandKey key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
Comment thread
xkrogen marked this conversation as resolved.
for (LongWritable v : values) {
sum += v.get();
}
context.write(key, new LongWritable(sum));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class AuditReplayThread extends Thread {
// and merge them all together at the end.
private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
private Map<String, Counter> individualCommandsMap = new HashMap<>();
private Map<UserCommandKey, LongWritable> commandLatencyMap = new HashMap<>();

AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue,
ConcurrentMap<String, FileSystem> fsCache) throws IOException {
Expand Down Expand Up @@ -99,6 +101,12 @@ void drainCounters(Mapper.Context context) {
}
}

void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException {
for (Map.Entry<UserCommandKey, LongWritable> ent : commandLatencyMap.entrySet()) {
context.write(ent.getKey(), ent.getValue());
}
}

/**
* Add a command to this thread's processing queue.
* @param cmd Command to add.
Expand Down Expand Up @@ -254,7 +262,14 @@ public FileSystem run() {
fs.concat(new Path(src), dsts.toArray(new Path[] {}));
break;
}

long latency = System.currentTimeMillis() - startTime;

UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.getType().toString());
commandLatencyMap.putIfAbsent(userCommandKey, new LongWritable(0));
LongWritable latencyWritable = commandLatencyMap.get(userCommandKey);
latencyWritable.set(latencyWritable.get() + latency);

switch (replayCommand.getType()) {
case WRITE:
replayCountersMap.get(REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY).increment(latency);
Expand Down
Loading