From 77ed60a7d0af0e89aa793db5daa3c7df9fc7c81e Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Tue, 12 Feb 2019 15:57:19 -0800 Subject: [PATCH 01/10] Reducer support and per-user aggregation --- .../java/com/linkedin/dynamometer/Client.java | 11 +++- .../workloadgenerator/CreateFileMapper.java | 2 +- .../workloadgenerator/WorkloadDriver.java | 61 ++++++++++++++++--- .../workloadgenerator/WorkloadMapper.java | 2 +- .../workloadgenerator/WorkloadReducer.java | 32 ++++++++++ .../audit/AuditReplayMapper.java | 2 +- .../audit/AuditReplayReducer.java | 46 ++++++++++++++ .../audit/AuditReplayThread.java | 13 ++++ .../audit/AuditTextOutputFormat.java | 26 ++++++++ .../audit/NoSplitTextInputFormat.java | 1 + .../TestWorkloadGenerator.java | 17 ++++-- 11 files changed, 193 insertions(+), 20 deletions(-) create mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java create mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java create mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index f41280f0ba..c805d9ef23 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -37,6 +37,8 @@ import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; + +import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -145,6 +147,7 @@ public class Client extends Configured implements Tool { public static final String TOKEN_FILE_LOCATION_ARG = "token_file_location"; public static final String WORKLOAD_REPLAY_ENABLE_ARG = "workload_replay_enable"; public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path"; + public static final String WORKLOAD_OUTPUT_PATH_ARG = "workload_output_path"; public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper"; public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay"; public static final String WORKLOAD_RATE_FACTOR_ARG = "workload_rate_factor"; @@ -205,6 +208,8 @@ public class Client extends Configured implements Tool { private volatile Job workloadJob; // The input path for the workload job. private String workloadInputPath = ""; + // The output path for the workload job. + private String workloadOutputPath = ""; // The number of threads to use per mapper for the workload job. private int workloadThreadsPerMapper; // The startup delay for the workload job. @@ -409,6 +414,7 @@ public boolean accept(Path path) { } launchWorkloadJob = true; workloadInputPath = cliParser.getOptionValue(WORKLOAD_INPUT_PATH_ARG); + workloadOutputPath = cliParser.getOptionValue(WORKLOAD_OUTPUT_PATH_ARG); workloadThreadsPerMapper = Integer.parseInt(cliParser.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG, String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT))); workloadRateFactor = Double.parseDouble(cliParser.getOptionValue(WORKLOAD_RATE_FACTOR_ARG, @@ -912,13 +918,16 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { long workloadStartTime = System.currentTimeMillis() + workloadStartDelayMs; Configuration workloadConf = new Configuration(getConf()); workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + if (workloadOutputPath != null) { + workloadConf.set(AuditReplayReducer.OUTPUT_PATH_KEY, workloadOutputPath); + } workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper); workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor); for (Map.Entry configPair : workloadExtraConfigs.entrySet()) { workloadConf.set(configPair.getKey(), configPair.getValue()); } workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(), - workloadStartTime, AuditReplayMapper.class); + workloadStartTime, AuditReplayMapper.class, AuditReplayReducer.class); workloadJob.submit(); while (!isCompleted(infraAppState) && !isCompleted(workloadAppState)) { workloadJob.monitorAndPrintJob(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java index b6db85ed4e..4449ea2313 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java @@ -32,7 +32,7 @@ * in which to create files. * */ -public class CreateFileMapper extends WorkloadMapper { +public class CreateFileMapper extends WorkloadMapper { public static final String FILE_PARENT_PATH_KEY = "createfile.file-parent-path"; public static final String FILE_PARENT_PATH_DEFAULT = "/tmp/createFileMapper"; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java index 38f237f572..8b10546f85 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java @@ -9,6 +9,8 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; + +import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -17,14 +19,21 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.ObjectUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -44,6 +53,7 @@ public class WorkloadDriver extends Configured implements Tool { public static final String START_TIME_OFFSET_DEFAULT = "1m"; public static final String NN_URI = "nn_uri"; public static final String MAPPER_CLASS_NAME = "mapper_class_name"; + public static final String REDUCER_CLASS_NAME = "reducer_class_name"; public int run(String[] args) throws Exception { Option helpOption = new Option("h", "help", false, "Shows this message. Additionally specify the " + MAPPER_CLASS_NAME @@ -64,6 +74,11 @@ public int run(String[] args) throws Exception { "1. AuditReplayMapper \n" + "2. CreateFileMapper \nFully specified class names are also supported.") .isRequired().create(MAPPER_CLASS_NAME); options.addOption(mapperClassOption); + Option reducerClassOption = OptionBuilder.withArgName("Reducer ClassName").hasArg().withDescription( + "Class name of the reducer (optional); must be a Reducer subclass. Reducers supported currently: \n" + + "1. AuditReplayReducer \nFully specificed class names are also supported.") + .create(REDUCER_CLASS_NAME); + options.addOption(reducerClassOption); Options helpOptions = new Options(); helpOptions.addOption(helpOption); @@ -102,14 +117,19 @@ public int run(String[] args) throws Exception { return 1; } - Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass); + Class reducerClass = null; + if (cli.getOptionValue(REDUCER_CLASS_NAME) != null) { + reducerClass = getReducerClass(cli.getOptionValue(REDUCER_CLASS_NAME)); + } + + Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass, reducerClass); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static Job getJobForSubmission(Configuration baseConf, String nnURI, long startTimestampMs, - Class mapperClass) throws IOException, ClassNotFoundException, + Class mapperClass, Class reducerClass) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { Configuration conf = new Configuration(baseConf); conf.set(NN_URI, nnURI); @@ -120,16 +140,25 @@ public static Job getJobForSubmission(Configuration baseConf, String nnURI, long conf.setLong(START_TIMESTAMP_MS, startTimestampMs); Job job = Job.getInstance(conf, "Dynamometer Workload Driver"); - job.setOutputFormatClass(NullOutputFormat.class); job.setJarByClass(mapperClass); job.setMapperClass(mapperClass); job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf)); - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(NullWritable.class); + if (reducerClass == null) { + job.setNumReduceTasks(0); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + } else { + job.setNumReduceTasks(1); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setOutputFormatClass(reducerClass.newInstance().getOutputFormat(conf)); + job.setReducerClass(reducerClass); + } return job; } @@ -151,9 +180,21 @@ private Class getMapperClass(String className) throws return (Class) mapperClass; } + private Class getReducerClass(String className) throws ClassNotFoundException { + if (!className.contains(".")) { + className = WorkloadDriver.class.getPackage().getName() + "." + className; + } + Class mapperClass = getConf().getClassByName(className); + if (!WorkloadReducer.class.isAssignableFrom(mapperClass)) { + throw new IllegalArgumentException(className + " is not a subclass of " + + WorkloadReducer.class.getCanonicalName()); + } + return (Class) mapperClass; + } + private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundException, InstantiationException, IllegalAccessException { - WorkloadMapper mapper = getMapperClass(mapperClassName).newInstance(); + WorkloadMapper mapper = getMapperClass(mapperClassName).newInstance(); StringBuilder builder = new StringBuilder("Usage for "); builder.append(mapper.getClass().getSimpleName()); builder.append(":\n"); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java index 0e4a651f0f..e41e61973c 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java @@ -16,7 +16,7 @@ * {@link TimedInputFormat} as its {@link InputFormat}. Subclasses expecting a different {@link InputFormat} * should override the {@link #getInputFormat(Configuration)} method. */ -public abstract class WorkloadMapper extends Mapper { +public abstract class WorkloadMapper extends Mapper { /** * Return the input class to be used by this mapper. diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java new file mode 100644 index 0000000000..71c69881ce --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java @@ -0,0 +1,32 @@ +package com.linkedin.dynamometer.workloadgenerator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; + +import java.util.List; + +public abstract class WorkloadReducer + extends Reducer { + + public Class getOutputFormat(Configuration conf) { + return NullOutputFormat.class; + } + + /** + * Get the description of the behavior of this reducer. + */ + public abstract String getDescription(); + + /** + * Get a list of the description of each configuration that this mapper accepts. + */ + public abstract List getConfigDescriptions(); + + /** + * Verify that the provided configuration contains all configurations + * required by this mapper. + */ + public abstract boolean verifyConfigurations(Configuration conf); +} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 57bf6336d8..05372bf6a6 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -55,7 +55,7 @@ * replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate * factor of 0.5 would make it occur half as fast. */ -public class AuditReplayMapper extends WorkloadMapper { +public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; public static final String NUM_THREADS_KEY = "auditreplay.num-threads"; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java new file mode 100644 index 0000000000..91638cca7f --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -0,0 +1,46 @@ +package com.linkedin.dynamometer.workloadgenerator.audit; + +import com.google.common.collect.Lists; +import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import java.io.IOException; +import java.util.List; + +public class AuditReplayReducer extends + WorkloadReducer { + public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; + + @Override + public Class getOutputFormat(Configuration conf) { + return AuditTextOutputFormat.class; + } + + @Override + public String getDescription() { + return "This reducer aggregates latency data."; + } + + @Override + public List getConfigDescriptions() { + return Lists.newArrayList("default config"); + } + + @Override + public boolean verifyConfigurations(Configuration conf) { + return conf.get(OUTPUT_PATH_KEY) != null; + } + + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + long sum = 0; + for (LongWritable v : values) { + sum += v.get(); + } + context.write(key, new LongWritable(sum)); + } +} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index db480b9e7b..5e79dd32b9 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -25,6 +25,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.counters.GenericCounter; @@ -53,6 +55,7 @@ public class AuditReplayThread extends Thread { private ConcurrentMap fsCache; private URI namenodeUri; private UserGroupInformation loginUser; + private Mapper.Context mapperContext; private Configuration mapperConf; // If any exception is encountered it will be stored here private Exception exception; @@ -69,6 +72,7 @@ public class AuditReplayThread extends Thread { commandQueue = queue; this.fsCache = fsCache; loginUser = UserGroupInformation.getLoginUser(); + this.mapperContext = mapperContext; mapperConf = mapperContext.getConfiguration(); namenodeUri = URI.create(mapperConf.get(WorkloadDriver.NN_URI)); startTimestampMs = mapperConf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1); @@ -254,7 +258,16 @@ public FileSystem run() { fs.concat(new Path(src), dsts.toArray(new Path[] {})); break; } + + String key = command.getSimpleUgi() + "_" + replayCommand.getType().toString(); long latency = System.currentTimeMillis() - startTime; + +// try { +// mapperContext.write(new Text(key), new LongWritable(latency)); +// } catch (InterruptedException|IOException e) { +// throw new IOException("Error writing to context", e); +// } + switch (replayCommand.getType()) { case WRITE: replayCountersMap.get(REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY).increment(latency); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java new file mode 100644 index 0000000000..c8c80d8a7b --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java @@ -0,0 +1,26 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.workloadgenerator.audit; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import java.io.IOException; +import java.util.List; + +public class AuditTextOutputFormat extends TextOutputFormat { + @Override + public void checkOutputSpecs(JobContext context) throws IOException { + context.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", + context.getConfiguration().get(AuditReplayReducer.OUTPUT_PATH_KEY)); + super.checkOutputSpecs(context); + } +} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java index 0938583188..6af2be24d3 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java @@ -11,6 +11,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index cbe6db05ce..bf70b584eb 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -4,10 +4,8 @@ */ package com.linkedin.dynamometer.workloadgenerator; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditCommandParser; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; +import com.linkedin.dynamometer.workloadgenerator.audit.*; + import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -61,17 +59,24 @@ public void tearDown() throws Exception { public void testAuditWorkloadDirectParser() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); + String auditOutputPath = "/reducer_output/trace_output_direct"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); testAuditWorkload(); + assertTrue(dfs.exists(new Path(auditOutputPath))); } @Test public void testAuditWorkloadHiveParser() throws Exception { - String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); + String workloadInputPath = + TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); + String auditOutputPath = "/reducer_output/trace_output_hive"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); testAuditWorkload(); + assertTrue(dfs.exists(new Path(auditOutputPath))); } /** @@ -96,7 +101,7 @@ public void authorize(UserGroupInformation user, String remoteAddress) throws Au private void testAuditWorkload() throws Exception { long workloadStartTime = System.currentTimeMillis() + 10000; Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(), - workloadStartTime, AuditReplayMapper.class); + workloadStartTime, AuditReplayMapper.class, AuditReplayReducer.class); boolean success = workloadJob.waitForCompletion(true); assertTrue("workload job should succeed", success); Counters counters = workloadJob.getCounters(); From 3bc3998caac5bb6943ae93dda11de6f2f297e3c3 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Wed, 13 Feb 2019 14:06:59 -0800 Subject: [PATCH 02/10] Clean up --- .../java/com/linkedin/dynamometer/Client.java | 12 ++++---- .../workloadgenerator/WorkloadDriver.java | 14 ++++------ .../workloadgenerator/WorkloadReducer.java | 14 ---------- .../audit/AuditReplayReducer.java | 10 ------- .../audit/AuditReplayThread.java | 10 +++---- .../audit/NoSplitTextInputFormat.java | 1 - .../TestWorkloadGenerator.java | 28 +++++++++++++++---- 7 files changed, 40 insertions(+), 49 deletions(-) diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index c805d9ef23..294fea47f9 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -12,6 +12,7 @@ import com.google.common.base.StandardSystemProperty; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver; import java.io.File; @@ -208,7 +209,7 @@ public class Client extends Configured implements Tool { private volatile Job workloadJob; // The input path for the workload job. private String workloadInputPath = ""; - // The output path for the workload job. + // The output path for the workload job metric results. private String workloadOutputPath = ""; // The number of threads to use per mapper for the workload job. private int workloadThreadsPerMapper; @@ -918,16 +919,17 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { long workloadStartTime = System.currentTimeMillis() + workloadStartDelayMs; Configuration workloadConf = new Configuration(getConf()); workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - if (workloadOutputPath != null) { - workloadConf.set(AuditReplayReducer.OUTPUT_PATH_KEY, workloadOutputPath); - } workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper); workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor); for (Map.Entry configPair : workloadExtraConfigs.entrySet()) { workloadConf.set(configPair.getKey(), configPair.getValue()); } + Class reducerClass = null; + if (workloadOutputPath != null) { + reducerClass = AuditReplayReducer.class; + } workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(), - workloadStartTime, AuditReplayMapper.class, AuditReplayReducer.class); + workloadStartTime, AuditReplayMapper.class, reducerClass); workloadJob.submit(); while (!isCompleted(infraAppState) && !isCompleted(workloadAppState)) { workloadJob.monitorAndPrintJob(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java index 8b10546f85..863131b0fd 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java @@ -10,7 +10,6 @@ import java.util.Date; import java.util.concurrent.TimeUnit; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -19,21 +18,16 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; -import org.apache.commons.lang.ObjectUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -76,7 +70,7 @@ public int run(String[] args) throws Exception { options.addOption(mapperClassOption); Option reducerClassOption = OptionBuilder.withArgName("Reducer ClassName").hasArg().withDescription( "Class name of the reducer (optional); must be a Reducer subclass. Reducers supported currently: \n" + - "1. AuditReplayReducer \nFully specificed class names are also supported.") + "1. AuditReplayReducer \nFully specified class names are also supported.") .create(REDUCER_CLASS_NAME); options.addOption(reducerClassOption); @@ -116,10 +110,13 @@ public int run(String[] args) throws Exception { System.err.println(getMapperUsageInfo(cli.getOptionValue(MAPPER_CLASS_NAME))); return 1; } - Class reducerClass = null; if (cli.getOptionValue(REDUCER_CLASS_NAME) != null) { reducerClass = getReducerClass(cli.getOptionValue(REDUCER_CLASS_NAME)); + if (!reducerClass.newInstance().verifyConfigurations(getConf())) { + System.err.println("Incorrect config for " + reducerClass.getName()); + return 1; + } } Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass, reducerClass); @@ -208,5 +205,4 @@ private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundEx return builder.toString(); } - } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java index 71c69881ce..81855f3395 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java @@ -14,19 +14,5 @@ public Class getOutputFormat(Configuration conf) { return NullOutputFormat.class; } - /** - * Get the description of the behavior of this reducer. - */ - public abstract String getDescription(); - - /** - * Get a list of the description of each configuration that this mapper accepts. - */ - public abstract List getConfigDescriptions(); - - /** - * Verify that the provided configuration contains all configurations - * required by this mapper. - */ public abstract boolean verifyConfigurations(Configuration conf); } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index 91638cca7f..20f3df5a8f 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -20,16 +20,6 @@ public Class getOutputFormat(Configuration conf) { return AuditTextOutputFormat.class; } - @Override - public String getDescription() { - return "This reducer aggregates latency data."; - } - - @Override - public List getConfigDescriptions() { - return Lists.newArrayList("default config"); - } - @Override public boolean verifyConfigurations(Configuration conf) { return conf.get(OUTPUT_PATH_KEY) != null; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 5e79dd32b9..3aa7d51c10 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -262,11 +262,11 @@ public FileSystem run() { String key = command.getSimpleUgi() + "_" + replayCommand.getType().toString(); long latency = System.currentTimeMillis() - startTime; -// try { -// mapperContext.write(new Text(key), new LongWritable(latency)); -// } catch (InterruptedException|IOException e) { -// throw new IOException("Error writing to context", e); -// } + try { + mapperContext.write(new Text(key), new LongWritable(latency)); + } catch (InterruptedException|IOException e) { + throw new IOException("Error writing to context", e); + } switch (replayCommand.getType()) { case WRITE: diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java index 6af2be24d3..0938583188 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/NoSplitTextInputFormat.java @@ -11,7 +11,6 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index bf70b584eb..a9820ab4ef 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -57,25 +57,43 @@ public void tearDown() throws Exception { @Test public void testAuditWorkloadDirectParser() throws Exception { + String workloadInputPath = + TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); + conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); + testAuditWorkloadWithReducer(null); + } + + @Test + public void testAuditWorkloadHiveParser() throws Exception { + String workloadInputPath = + TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); + conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); + testAuditWorkloadWithReducer(null); + } + + @Test + public void testAuditWorkloadDirectParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); String auditOutputPath = "/reducer_output/trace_output_direct"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); - testAuditWorkload(); + testAuditWorkloadWithReducer(AuditReplayReducer.class); assertTrue(dfs.exists(new Path(auditOutputPath))); } @Test - public void testAuditWorkloadHiveParser() throws Exception { + public void testAuditWorkloadHiveParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); String auditOutputPath = "/reducer_output/trace_output_hive"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkload(); + testAuditWorkloadWithReducer(AuditReplayReducer.class); assertTrue(dfs.exists(new Path(auditOutputPath))); } @@ -98,10 +116,10 @@ public void authorize(UserGroupInformation user, String remoteAddress) throws Au } } - private void testAuditWorkload() throws Exception { + private void testAuditWorkloadWithReducer(Class reducerClass) throws Exception { long workloadStartTime = System.currentTimeMillis() + 10000; Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(), - workloadStartTime, AuditReplayMapper.class, AuditReplayReducer.class); + workloadStartTime, AuditReplayMapper.class, reducerClass); boolean success = workloadJob.waitForCompletion(true); assertTrue("workload job should succeed", success); Counters counters = workloadJob.getCounters(); From 67873d7fbeffae5e6ba2b6b10b4ce0e8928ac8dd Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Feb 2019 15:30:43 -0800 Subject: [PATCH 03/10] Refactor and clean up --- .../java/com/linkedin/dynamometer/Client.java | 8 +-- .../workloadgenerator/WorkloadDriver.java | 53 ++----------------- .../workloadgenerator/WorkloadMapper.java | 13 ++++- .../workloadgenerator/WorkloadReducer.java | 18 ------- .../audit/AuditReplayMapper.java | 29 +++++++--- .../audit/AuditReplayReducer.java | 24 +++------ .../audit/AuditReplayThread.java | 8 ++- .../audit/AuditTextOutputFormat.java | 26 --------- .../TestWorkloadGenerator.java | 38 +++++-------- 9 files changed, 63 insertions(+), 154 deletions(-) delete mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java delete mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 294fea47f9..5b06151e27 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -12,7 +12,6 @@ import com.google.common.base.StandardSystemProperty; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver; import java.io.File; @@ -39,7 +38,6 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -924,12 +922,8 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { for (Map.Entry configPair : workloadExtraConfigs.entrySet()) { workloadConf.set(configPair.getKey(), configPair.getValue()); } - Class reducerClass = null; - if (workloadOutputPath != null) { - reducerClass = AuditReplayReducer.class; - } workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(), - workloadStartTime, AuditReplayMapper.class, reducerClass); + workloadStartTime, AuditReplayMapper.class); workloadJob.submit(); while (!isCompleted(infraAppState) && !isCompleted(workloadAppState)) { workloadJob.monitorAndPrintJob(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java index 863131b0fd..c88843861d 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java @@ -22,11 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -47,7 +43,6 @@ public class WorkloadDriver extends Configured implements Tool { public static final String START_TIME_OFFSET_DEFAULT = "1m"; public static final String NN_URI = "nn_uri"; public static final String MAPPER_CLASS_NAME = "mapper_class_name"; - public static final String REDUCER_CLASS_NAME = "reducer_class_name"; public int run(String[] args) throws Exception { Option helpOption = new Option("h", "help", false, "Shows this message. Additionally specify the " + MAPPER_CLASS_NAME @@ -68,11 +63,6 @@ public int run(String[] args) throws Exception { "1. AuditReplayMapper \n" + "2. CreateFileMapper \nFully specified class names are also supported.") .isRequired().create(MAPPER_CLASS_NAME); options.addOption(mapperClassOption); - Option reducerClassOption = OptionBuilder.withArgName("Reducer ClassName").hasArg().withDescription( - "Class name of the reducer (optional); must be a Reducer subclass. Reducers supported currently: \n" + - "1. AuditReplayReducer \nFully specified class names are also supported.") - .create(REDUCER_CLASS_NAME); - options.addOption(reducerClassOption); Options helpOptions = new Options(); helpOptions.addOption(helpOption); @@ -110,23 +100,15 @@ public int run(String[] args) throws Exception { System.err.println(getMapperUsageInfo(cli.getOptionValue(MAPPER_CLASS_NAME))); return 1; } - Class reducerClass = null; - if (cli.getOptionValue(REDUCER_CLASS_NAME) != null) { - reducerClass = getReducerClass(cli.getOptionValue(REDUCER_CLASS_NAME)); - if (!reducerClass.newInstance().verifyConfigurations(getConf())) { - System.err.println("Incorrect config for " + reducerClass.getName()); - return 1; - } - } - Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass, reducerClass); + Job job = getJobForSubmission(getConf(), nnURI, startTimestampMs, mapperClass); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static Job getJobForSubmission(Configuration baseConf, String nnURI, long startTimestampMs, - Class mapperClass, Class reducerClass) throws IOException, ClassNotFoundException, + Class mapperClass) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { Configuration conf = new Configuration(baseConf); conf.set(NN_URI, nnURI); @@ -139,23 +121,8 @@ public static Job getJobForSubmission(Configuration baseConf, String nnURI, long Job job = Job.getInstance(conf, "Dynamometer Workload Driver"); job.setJarByClass(mapperClass); job.setMapperClass(mapperClass); - job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf)); - if (reducerClass == null) { - job.setNumReduceTasks(0); - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(NullWritable.class); - job.setOutputFormatClass(NullOutputFormat.class); - } else { - job.setNumReduceTasks(1); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(LongWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setOutputFormatClass(reducerClass.newInstance().getOutputFormat(conf)); - job.setReducerClass(reducerClass); - } + + mapperClass.newInstance().configureJob(job); return job; } @@ -177,18 +144,6 @@ private Class getMapperClass(String className) throws return (Class) mapperClass; } - private Class getReducerClass(String className) throws ClassNotFoundException { - if (!className.contains(".")) { - className = WorkloadDriver.class.getPackage().getName() + "." + className; - } - Class mapperClass = getConf().getClassByName(className); - if (!WorkloadReducer.class.isAssignableFrom(mapperClass)) { - throw new IllegalArgumentException(className + " is not a subclass of " + - WorkloadReducer.class.getCanonicalName()); - } - return (Class) mapperClass; - } - private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundException, InstantiationException, IllegalAccessException { WorkloadMapper mapper = getMapperClass(mapperClassName).newInstance(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java index e41e61973c..65817bf200 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java @@ -8,7 +8,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; /** @@ -21,7 +23,7 @@ public abstract class WorkloadMapper extends M /** * Return the input class to be used by this mapper. */ - public Class getInputFormat(Configuration conf) { + public static Class getInputFormat(Configuration conf) { return TimedInputFormat.class; } @@ -41,4 +43,13 @@ public Class getInputFormat(Configuration conf) { */ public abstract boolean verifyConfigurations(Configuration conf); + /** + * Get the associated Reducer class to run on the outputted kv pairs. + */ + public void configureJob(Job job) { + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java deleted file mode 100644 index 81855f3395..0000000000 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadReducer.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.linkedin.dynamometer.workloadgenerator; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; - -import java.util.List; - -public abstract class WorkloadReducer - extends Reducer { - - public Class getOutputFormat(Configuration conf) { - return NullOutputFormat.class; - } - - public abstract boolean verifyConfigurations(Configuration conf); -} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 05372bf6a6..2e45454fb8 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -18,16 +18,19 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.READ; import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.WRITE; @@ -58,6 +61,7 @@ public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; + public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; public static final String NUM_THREADS_KEY = "auditreplay.num-threads"; public static final int NUM_THREADS_DEFAULT = 1; public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks"; @@ -141,11 +145,6 @@ public enum CommandType { private AuditCommandParser commandParser; private ScheduledThreadPoolExecutor progressExecutor; - @Override - public Class getInputFormat(Configuration conf) { - return NoSplitTextInputFormat.class; - } - @Override public String getDescription() { return "This mapper replays audit log files."; @@ -155,6 +154,7 @@ public String getDescription() { public List getConfigDescriptions() { return Lists.newArrayList( INPUT_PATH_KEY + " (required): Path to directory containing input files.", + OUTPUT_PATH_KEY + " (required): Path to destination for output files.", NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.", CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT + "): Whether or not to create 1-byte blocks when " + "performing `create` commands.", @@ -166,7 +166,7 @@ public List getConfigDescriptions() { @Override public boolean verifyConfigurations(Configuration conf) { - return conf.get(INPUT_PATH_KEY) != null; + return conf.get(INPUT_PATH_KEY) != null && conf.get(OUTPUT_PATH_KEY) != null; } @Override @@ -252,4 +252,19 @@ public void cleanup(Mapper.Context context) throws InterruptedException { LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps); } } + + @Override + public void configureJob(Job job) { + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setInputFormatClass(NoSplitTextInputFormat.class); + + job.setNumReduceTasks(1); + job.setReducerClass(AuditReplayReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setOutputFormatClass(TextOutputFormat.class); + + TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY))); + } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index 20f3df5a8f..2be1f9c4bb 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -1,29 +1,17 @@ +/* + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ package com.linkedin.dynamometer.workloadgenerator.audit; -import com.google.common.collect.Lists; -import com.linkedin.dynamometer.workloadgenerator.WorkloadReducer; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; -import java.util.List; public class AuditReplayReducer extends - WorkloadReducer { - public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; - - @Override - public Class getOutputFormat(Configuration conf) { - return AuditTextOutputFormat.class; - } - - @Override - public boolean verifyConfigurations(Configuration conf) { - return conf.get(OUTPUT_PATH_KEY) != null; - } + Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 3aa7d51c10..215ca3844e 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -62,6 +62,9 @@ public class AuditReplayThread extends Thread { private long startTimestampMs; private boolean createBlocks; + private Text userCommandKey = new Text(); + private LongWritable userCommandLatency = new LongWritable(); + // Counters are not thread-safe so we store a local mapping in our thread // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); @@ -259,11 +262,12 @@ public FileSystem run() { break; } - String key = command.getSimpleUgi() + "_" + replayCommand.getType().toString(); + userCommandKey.set(command.getSimpleUgi() + "_" + replayCommand.getType().toString()); long latency = System.currentTimeMillis() - startTime; + userCommandLatency.set(latency); try { - mapperContext.write(new Text(key), new LongWritable(latency)); + mapperContext.write(userCommandKey, userCommandLatency); } catch (InterruptedException|IOException e) { throw new IOException("Error writing to context", e); } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java deleted file mode 100644 index c8c80d8a7b..0000000000 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditTextOutputFormat.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. - * See LICENSE in the project root for license information. - */ -package com.linkedin.dynamometer.workloadgenerator.audit; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - -import java.io.IOException; -import java.util.List; - -public class AuditTextOutputFormat extends TextOutputFormat { - @Override - public void checkOutputSpecs(JobContext context) throws IOException { - context.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", - context.getConfiguration().get(AuditReplayReducer.OUTPUT_PATH_KEY)); - super.checkOutputSpecs(context); - } -} diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index a9820ab4ef..a3db248826 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -4,9 +4,13 @@ */ package com.linkedin.dynamometer.workloadgenerator; -import com.linkedin.dynamometer.workloadgenerator.audit.*; - import java.io.IOException; + +import com.linkedin.dynamometer.workloadgenerator.audit.AuditCommandParser; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; +import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -55,33 +59,15 @@ public void tearDown() throws Exception { } } - @Test - public void testAuditWorkloadDirectParser() throws Exception { - String workloadInputPath = - TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); - conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); - testAuditWorkloadWithReducer(null); - } - - @Test - public void testAuditWorkloadHiveParser() throws Exception { - String workloadInputPath = - TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); - conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkloadWithReducer(null); - } - @Test public void testAuditWorkloadDirectParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); String auditOutputPath = "/reducer_output/trace_output_direct"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); - testAuditWorkloadWithReducer(AuditReplayReducer.class); + testAuditWorkload(); assertTrue(dfs.exists(new Path(auditOutputPath))); } @@ -91,9 +77,9 @@ public void testAuditWorkloadHiveParserWithOutput() throws Exception { TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); String auditOutputPath = "/reducer_output/trace_output_hive"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); - conf.set(AuditReplayReducer.OUTPUT_PATH_KEY, auditOutputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkloadWithReducer(AuditReplayReducer.class); + testAuditWorkload(); assertTrue(dfs.exists(new Path(auditOutputPath))); } @@ -116,10 +102,10 @@ public void authorize(UserGroupInformation user, String remoteAddress) throws Au } } - private void testAuditWorkloadWithReducer(Class reducerClass) throws Exception { + private void testAuditWorkload() throws Exception { long workloadStartTime = System.currentTimeMillis() + 10000; Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(), - workloadStartTime, AuditReplayMapper.class, reducerClass); + workloadStartTime, AuditReplayMapper.class); boolean success = workloadJob.waitForCompletion(true); assertTrue("workload job should succeed", success); Counters counters = workloadJob.getCounters(); From 4c84065baac636d73523dfe1976876945fa8c139 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Feb 2019 15:54:42 -0800 Subject: [PATCH 04/10] tests pass! --- .../src/main/java/com/linkedin/dynamometer/Client.java | 2 ++ .../java/com/linkedin/dynamometer/TestDynamometerInfra.java | 1 + .../dynamometer/workloadgenerator/TestWorkloadGenerator.java | 4 ++-- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 5b06151e27..12f1c1321b 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -303,6 +303,7 @@ public Client(String... dependencyJars) { opts.addOption(WORKLOAD_REPLAY_ENABLE_ARG, false, "If specified, this client will additionally launch the workload " + "replay job to replay audit logs against the HDFS cluster which is started."); opts.addOption(WORKLOAD_INPUT_PATH_ARG, true, "Location of the audit traces to replay (Required for workload)"); + opts.addOption(WORKLOAD_OUTPUT_PATH_ARG, true, "Location of the metrics output (Required for workload)"); opts.addOption(WORKLOAD_THREADS_PER_MAPPER_ARG, true, "Number of threads per mapper to use to " + "replay the workload. (default " + AuditReplayMapper.NUM_THREADS_DEFAULT + ")"); opts.addOption(WORKLOAD_START_DELAY_ARG, true, "Delay between launching the Workload MR job and " + @@ -917,6 +918,7 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { long workloadStartTime = System.currentTimeMillis() + workloadStartDelayMs; Configuration workloadConf = new Configuration(getConf()); workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + workloadConf.set(AuditReplayMapper.OUTPUT_PATH_KEY, workloadOutputPath); workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper); workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor); for (Map.Entry configPair : workloadExtraConfigs.entrySet()) { diff --git a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java index 44a108dfb9..3c247a4c94 100644 --- a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java +++ b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java @@ -278,6 +278,7 @@ public void run() { "-" + AMOptions.SHELL_ENV_ARG, "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop", "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG, "-" + Client.WORKLOAD_INPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(), + "-" + Client.WORKLOAD_OUTPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/trace_output_direct")).toString(), "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1", "-" + Client.WORKLOAD_START_DELAY_ARG, "10s", "-" + AMOptions.NAMENODE_ARGS_ARG, "-Ddfs.namenode.safemode.extension=0" diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index a3db248826..b9fac8f772 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -63,7 +63,7 @@ public void tearDown() throws Exception { public void testAuditWorkloadDirectParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); - String auditOutputPath = "/reducer_output/trace_output_direct"; + String auditOutputPath = "/tmp/trace_output_direct"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); @@ -75,7 +75,7 @@ public void testAuditWorkloadDirectParserWithOutput() throws Exception { public void testAuditWorkloadHiveParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); - String auditOutputPath = "/reducer_output/trace_output_hive"; + String auditOutputPath = "/tmp/trace_output_hive"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); From e5da74113db998325d8e56f88671beef51681824 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Fri, 15 Feb 2019 08:35:09 -0800 Subject: [PATCH 05/10] Fix up exception handling --- .../audit/AuditReplayMapper.java | 3 +- .../audit/AuditReplayThread.java | 32 ++++++++++++------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 2e45454fb8..114d4d2c61 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -226,7 +226,7 @@ public void map(LongWritable lineNum, Text inputLine, Mapper.Context context) } @Override - public void cleanup(Mapper.Context context) throws InterruptedException { + public void cleanup(Mapper.Context context) throws InterruptedException, IOException { for (AuditReplayThread t : threads) { // Add in an indicator for each thread to shut down after the last real command t.addToQueue(AuditReplayCommand.getPoisonPill(highestTimestamp + 1)); @@ -235,6 +235,7 @@ public void cleanup(Mapper.Context context) throws InterruptedException { for (AuditReplayThread t : threads) { t.join(); t.drainCounters(context); + t.drainCommandLatencies(context); if (t.getException() != null) { threadException = Optional.of(t.getException()); } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 215ca3844e..92fb137f40 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -7,6 +7,7 @@ import com.google.common.base.Splitter; import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.URI; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -55,27 +56,23 @@ public class AuditReplayThread extends Thread { private ConcurrentMap fsCache; private URI namenodeUri; private UserGroupInformation loginUser; - private Mapper.Context mapperContext; private Configuration mapperConf; // If any exception is encountered it will be stored here private Exception exception; private long startTimestampMs; private boolean createBlocks; - private Text userCommandKey = new Text(); - private LongWritable userCommandLatency = new LongWritable(); - // Counters are not thread-safe so we store a local mapping in our thread // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); private Map individualCommandsMap = new HashMap<>(); + private Map userCommandMap = new HashMap<>(); AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue, ConcurrentMap fsCache) throws IOException { commandQueue = queue; this.fsCache = fsCache; loginUser = UserGroupInformation.getLoginUser(); - this.mapperContext = mapperContext; mapperConf = mapperContext.getConfiguration(); namenodeUri = URI.create(mapperConf.get(WorkloadDriver.NN_URI)); startTimestampMs = mapperConf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1); @@ -106,6 +103,20 @@ void drainCounters(Mapper.Context context) { } } + void drainCommandLatencies(Mapper.Context context) throws IOException { + Text outputKey = new Text(); + LongWritable outputValue = new LongWritable(); + for (Map.Entry ent : userCommandMap.entrySet()) { + outputKey.set(ent.getKey()); + outputValue.set(ent.getValue()); + try { + context.write(outputKey, outputValue); + } catch (IOException|InterruptedException e) { + throw new IOException("Error writing to context", e); + } + } + } + /** * Add a command to this thread's processing queue. * @param cmd Command to add. @@ -262,14 +273,13 @@ public FileSystem run() { break; } - userCommandKey.set(command.getSimpleUgi() + "_" + replayCommand.getType().toString()); + String userCommandKey = command.getSimpleUgi() + "_" + replayCommand.getType().toString(); long latency = System.currentTimeMillis() - startTime; - userCommandLatency.set(latency); - try { - mapperContext.write(userCommandKey, userCommandLatency); - } catch (InterruptedException|IOException e) { - throw new IOException("Error writing to context", e); + if (userCommandMap.containsKey(userCommandKey)) { + userCommandMap.put(userCommandKey, userCommandMap.get(userCommandKey) + latency); + } else { + userCommandMap.put(userCommandKey, latency); } switch (replayCommand.getType()) { From 4ddcc07206b6a109ed4a95a51fa99c7ad78c948a Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Fri, 15 Feb 2019 09:49:36 -0800 Subject: [PATCH 06/10] Add UserCommandKey type --- .../audit/AuditReplayMapper.java | 6 +- .../audit/AuditReplayReducer.java | 5 +- .../audit/AuditReplayThread.java | 22 ++--- .../audit/UserCommandKey.java | 91 +++++++++++++++++++ .../TestWorkloadGenerator.java | 1 - 5 files changed, 103 insertions(+), 22 deletions(-) create mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 114d4d2c61..b2487a685a 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -58,7 +58,7 @@ * replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate * factor of 0.5 would make it occur half as fast. */ -public class AuditReplayMapper extends WorkloadMapper { +public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; @@ -256,13 +256,13 @@ public void cleanup(Mapper.Context context) throws InterruptedException, IOExcep @Override public void configureJob(Job job) { - job.setMapOutputKeyClass(Text.class); + job.setMapOutputKeyClass(UserCommandKey.class); job.setMapOutputValueClass(LongWritable.class); job.setInputFormatClass(NoSplitTextInputFormat.class); job.setNumReduceTasks(1); job.setReducerClass(AuditReplayReducer.class); - job.setOutputKeyClass(Text.class); + job.setOutputKeyClass(UserCommandKey.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(TextOutputFormat.class); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index 2be1f9c4bb..14122b4134 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -5,16 +5,15 @@ package com.linkedin.dynamometer.workloadgenerator.audit; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class AuditReplayReducer extends - Reducer { + Reducer { @Override - protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + protected void reduce(UserCommandKey key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable v : values) { sum += v.get(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 92fb137f40..2d44ad6701 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -7,7 +7,6 @@ import com.google.common.base.Splitter; import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver; import java.io.IOException; -import java.io.InterruptedIOException; import java.net.URI; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -27,7 +26,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.counters.GenericCounter; @@ -66,7 +64,7 @@ public class AuditReplayThread extends Thread { // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); private Map individualCommandsMap = new HashMap<>(); - private Map userCommandMap = new HashMap<>(); + private Map commandLatencyMap = new HashMap<>(); AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue, ConcurrentMap fsCache) throws IOException { @@ -104,13 +102,9 @@ void drainCounters(Mapper.Context context) { } void drainCommandLatencies(Mapper.Context context) throws IOException { - Text outputKey = new Text(); - LongWritable outputValue = new LongWritable(); - for (Map.Entry ent : userCommandMap.entrySet()) { - outputKey.set(ent.getKey()); - outputValue.set(ent.getValue()); + for (Map.Entry ent : commandLatencyMap.entrySet()) { try { - context.write(outputKey, outputValue); + context.write(ent.getKey(), ent.getValue()); } catch (IOException|InterruptedException e) { throw new IOException("Error writing to context", e); } @@ -273,14 +267,12 @@ public FileSystem run() { break; } - String userCommandKey = command.getSimpleUgi() + "_" + replayCommand.getType().toString(); long latency = System.currentTimeMillis() - startTime; - if (userCommandMap.containsKey(userCommandKey)) { - userCommandMap.put(userCommandKey, userCommandMap.get(userCommandKey) + latency); - } else { - userCommandMap.put(userCommandKey, latency); - } + UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.getType().toString()); + commandLatencyMap.putIfAbsent(userCommandKey, new LongWritable(0)); + LongWritable latencyWritable = commandLatencyMap.get(userCommandKey); + latencyWritable.set(latencyWritable.get() + latency); switch (replayCommand.getType()) { case WRITE: diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java new file mode 100644 index 0000000000..28b0657ad3 --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -0,0 +1,91 @@ +/** + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.workloadgenerator.audit; + +import com.sun.istack.NotNull; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; +import javax.annotation.Nonnull; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +public class UserCommandKey implements WritableComparable { + private Text user; + private Text command; + + public UserCommandKey() { + user = new Text(); + command = new Text(); + } + + public UserCommandKey(Text user, Text command) { + this.user = user; + this.command = command; + } + + public UserCommandKey(String user, String command) { + this.user = new Text(user); + this.command = new Text(command); + } + + public String getUser() { + return user.toString(); + } + + public void setUser(String user) { + this.user.set(user); + } + + public String getCommand() { + return command.toString(); + } + + public void setCommand(String command) { + this.command.set(command); + } + + @Override + public void write(DataOutput out) throws IOException { + user.write(out); + command.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + user.readFields(in); + command.readFields(in); + } + + @Override + public int compareTo(@Nonnull Object o) { + return Integer.compare(hashCode(), o.hashCode()); + } + + @Override + public String toString() { + return getUser() + "," + getCommand(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserCommandKey that = (UserCommandKey) o; + return getUser().equals(that.getUser()) && getCommand().equals(that.getCommand()); + } + + @Override + public int hashCode() { + return Objects.hash(getUser(), getCommand()); + } +} diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index b9fac8f772..27aa28ac7f 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -10,7 +10,6 @@ import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; -import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; From 75de589a7365e6da60080d6eb4c272b2e666df67 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Fri, 15 Feb 2019 10:06:36 -0800 Subject: [PATCH 07/10] More clean up --- .../main/java/com/linkedin/dynamometer/Client.java | 1 - .../workloadgenerator/CreateFileMapper.java | 1 + .../workloadgenerator/WorkloadDriver.java | 2 -- .../workloadgenerator/WorkloadMapper.java | 12 +++--------- .../workloadgenerator/audit/UserCommandKey.java | 2 -- .../workloadgenerator/TestWorkloadGenerator.java | 3 +-- 6 files changed, 5 insertions(+), 16 deletions(-) diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 12f1c1321b..770fb354bb 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -37,7 +37,6 @@ import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java index 4449ea2313..befc7fe3f6 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java @@ -13,6 +13,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java index c88843861d..461afaf4d1 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java @@ -9,7 +9,6 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -121,7 +120,6 @@ public static Job getJobForSubmission(Configuration baseConf, String nnURI, long Job job = Job.getInstance(conf, "Dynamometer Workload Driver"); job.setJarByClass(mapperClass); job.setMapperClass(mapperClass); - mapperClass.newInstance().configureJob(job); return job; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java index 65817bf200..e153793504 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java @@ -19,14 +19,6 @@ * should override the {@link #getInputFormat(Configuration)} method. */ public abstract class WorkloadMapper extends Mapper { - - /** - * Return the input class to be used by this mapper. - */ - public static Class getInputFormat(Configuration conf) { - return TimedInputFormat.class; - } - /** * Get the description of the behavior of this mapper. */ @@ -44,9 +36,11 @@ public static Class getInputFormat(Configuration conf) { public abstract boolean verifyConfigurations(Configuration conf); /** - * Get the associated Reducer class to run on the outputted kv pairs. + * Setup input and output formats and optional reducer. */ public void configureJob(Job job) { + job.setInputFormatClass(TimedInputFormat.class); + job.setNumReduceTasks(0); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java index 28b0657ad3..fdb52d36c7 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -4,14 +4,12 @@ */ package com.linkedin.dynamometer.workloadgenerator.audit; -import com.sun.istack.NotNull; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Objects; import javax.annotation.Nonnull; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index 27aa28ac7f..36d136ff13 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -4,12 +4,11 @@ */ package com.linkedin.dynamometer.workloadgenerator; -import java.io.IOException; - import com.linkedin.dynamometer.workloadgenerator.audit.AuditCommandParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; +import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; From 21b65667fd359336054a9fd58f1e5b123f1b16e3 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Fri, 15 Feb 2019 11:31:27 -0800 Subject: [PATCH 08/10] Add output check to unit test --- .../workloadgenerator/TestWorkloadGenerator.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index 36d136ff13..57d75565be 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -12,6 +12,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; @@ -65,8 +66,7 @@ public void testAuditWorkloadDirectParserWithOutput() throws Exception { conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); - testAuditWorkload(); - assertTrue(dfs.exists(new Path(auditOutputPath))); + testAuditWorkloadWithOutput(auditOutputPath); } @Test @@ -77,8 +77,7 @@ public void testAuditWorkloadHiveParserWithOutput() throws Exception { conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkload(); - assertTrue(dfs.exists(new Path(auditOutputPath))); + testAuditWorkloadWithOutput(auditOutputPath); } /** @@ -100,7 +99,7 @@ public void authorize(UserGroupInformation user, String remoteAddress) throws Au } } - private void testAuditWorkload() throws Exception { + private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exception { long workloadStartTime = System.currentTimeMillis() + 10000; Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(), workloadStartTime, AuditReplayMapper.class); @@ -112,5 +111,12 @@ private void testAuditWorkload() throws Exception { assertTrue(dfs.getFileStatus(new Path("/tmp/test1")).isFile()); assertTrue(dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory()); assertFalse(dfs.exists(new Path("/denied"))); + + assertTrue(dfs.exists(new Path(auditOutputPath))); + FSDataInputStream auditOutput = dfs.open(new Path(auditOutputPath + "/part-r-00000")); + byte[] buf = new byte[auditOutput.available()]; + assertTrue(auditOutput.read(buf) > 0); + System.out.println(new String(buf)); + assertTrue(new String(buf).matches(".*hdfs,WRITE\\t[0-9]+\\n.*")); } } From 6538122913b18b187c628e9c15ab841d66a26f4c Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Fri, 15 Feb 2019 15:15:20 -0800 Subject: [PATCH 09/10] Last fixes and javadoc --- .gitignore | 1 + .../dynamometer/TestDynamometerInfra.java | 6 +++++- .../workloadgenerator/CreateFileMapper.java | 1 - .../workloadgenerator/WorkloadMapper.java | 4 ++-- .../audit/AuditReplayReducer.java | 6 ++++++ .../audit/AuditReplayThread.java | 8 ++------ .../workloadgenerator/audit/UserCommandKey.java | 16 ++++++---------- .../workloadgenerator/TestWorkloadGenerator.java | 14 ++++++-------- 8 files changed, 28 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 4944af77e4..20f639de91 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ scripts.tar *.iml .idea/ gradle.out +*/out diff --git a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java index 3c247a4c94..0daf55a045 100644 --- a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java +++ b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java @@ -107,6 +107,8 @@ public class TestDynamometerInfra { private static final String NAMENODE_NODELABEL = "dyno_namenode"; private static final String DATANODE_NODELABEL = "dyno_datanode"; + private static final String OUTPUT_PATH = "/tmp/trace_output_direct"; + private static MiniDFSCluster miniDFSCluster; private static MiniYARNCluster miniYARNCluster; private static YarnClient yarnClient; @@ -278,7 +280,7 @@ public void run() { "-" + AMOptions.SHELL_ENV_ARG, "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop", "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG, "-" + Client.WORKLOAD_INPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(), - "-" + Client.WORKLOAD_OUTPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/trace_output_direct")).toString(), + "-" + Client.WORKLOAD_OUTPUT_PATH_ARG, fs.makeQualified(new Path(OUTPUT_PATH)).toString(), "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1", "-" + Client.WORKLOAD_START_DELAY_ARG, "10s", "-" + AMOptions.NAMENODE_ARGS_ARG, "-Ddfs.namenode.safemode.extension=0" @@ -403,6 +405,8 @@ public Boolean get() { } } }, 3000, 60000); + + assertTrue(fs.exists(new Path(OUTPUT_PATH))); } private static URI getResourcePath(String resourceName) { diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java index befc7fe3f6..4449ea2313 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java @@ -13,7 +13,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java index e153793504..effd73761d 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java @@ -15,8 +15,8 @@ /** * Represents the base class for a generic workload-generating mapper. By default, it will expect to use - * {@link TimedInputFormat} as its {@link InputFormat}. Subclasses expecting a different {@link InputFormat} - * should override the {@link #getInputFormat(Configuration)} method. + * {@link TimedInputFormat} as its {@link InputFormat}. Subclasses requiring a reducer or expecting + * a different {@link InputFormat} should override the {@link #configureJob(Job)} method. */ public abstract class WorkloadMapper extends Mapper { /** diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index 14122b4134..e9f075592a 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -9,6 +9,12 @@ import java.io.IOException; + +/** + *

AuditReplayReducer aggregates the returned latency values from {@link AuditReplayMapper} and sums + * them up by {@link UserCommandKey}, which combines the user's id that ran the command and the type + * of the command (READ/WRITE). + */ public class AuditReplayReducer extends Reducer { diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 2d44ad6701..80b2b62768 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -101,13 +101,9 @@ void drainCounters(Mapper.Context context) { } } - void drainCommandLatencies(Mapper.Context context) throws IOException { + void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException { for (Map.Entry ent : commandLatencyMap.entrySet()) { - try { - context.write(ent.getKey(), ent.getValue()); - } catch (IOException|InterruptedException e) { - throw new IOException("Error writing to context", e); - } + context.write(ent.getKey(), ent.getValue()); } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java index fdb52d36c7..cd182fed87 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -12,7 +12,11 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; - +/** + *

UserCommandKey is a {@link Writable} used as a composite key combining the user id and + * type of a replayed command. It is used as the output key for AuditReplayMapper and the + * keys for AuditReplayReducer. + */ public class UserCommandKey implements WritableComparable { private Text user; private Text command; @@ -36,18 +40,10 @@ public String getUser() { return user.toString(); } - public void setUser(String user) { - this.user.set(user); - } - public String getCommand() { return command.toString(); } - public void setCommand(String command) { - this.command.set(command); - } - @Override public void write(DataOutput out) throws IOException { user.write(out); @@ -62,7 +58,7 @@ public void readFields(DataInput in) throws IOException { @Override public int compareTo(@Nonnull Object o) { - return Integer.compare(hashCode(), o.hashCode()); + return toString().compareTo(o.toString()); } @Override diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index 57d75565be..5fd1548855 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -9,6 +9,7 @@ import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -27,9 +28,7 @@ import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestWorkloadGenerator { @@ -113,10 +112,9 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio assertFalse(dfs.exists(new Path("/denied"))); assertTrue(dfs.exists(new Path(auditOutputPath))); - FSDataInputStream auditOutput = dfs.open(new Path(auditOutputPath + "/part-r-00000")); - byte[] buf = new byte[auditOutput.available()]; - assertTrue(auditOutput.read(buf) > 0); - System.out.println(new String(buf)); - assertTrue(new String(buf).matches(".*hdfs,WRITE\\t[0-9]+\\n.*")); + try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath + "/part-r-00000"))) { + String auditOutput = IOUtils.toString(auditOutputFile); + assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*")); + } } } From 500b5cffe7ccafcc3dfb43fe6743b604f9c79b34 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Fri, 15 Feb 2019 16:02:08 -0800 Subject: [PATCH 10/10] Style --- .../dynamometer/workloadgenerator/audit/AuditReplayReducer.java | 2 +- .../dynamometer/workloadgenerator/audit/UserCommandKey.java | 2 +- .../dynamometer/workloadgenerator/TestWorkloadGenerator.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index e9f075592a..27e2775f44 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -11,7 +11,7 @@ /** - *

AuditReplayReducer aggregates the returned latency values from {@link AuditReplayMapper} and sums + * AuditReplayReducer aggregates the returned latency values from {@link AuditReplayMapper} and sums * them up by {@link UserCommandKey}, which combines the user's id that ran the command and the type * of the command (READ/WRITE). */ diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java index cd182fed87..7126bd83f4 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -13,7 +13,7 @@ import org.apache.hadoop.io.WritableComparable; /** - *

UserCommandKey is a {@link Writable} used as a composite key combining the user id and + * UserCommandKey is a {@link Writable} used as a composite key combining the user id and * type of a replayed command. It is used as the output key for AuditReplayMapper and the * keys for AuditReplayReducer. */ diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index 5fd1548855..a2c891aff4 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -112,7 +112,7 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio assertFalse(dfs.exists(new Path("/denied"))); assertTrue(dfs.exists(new Path(auditOutputPath))); - try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath + "/part-r-00000"))) { + try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, "part-r-00000"))) { String auditOutput = IOUtils.toString(auditOutputFile); assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*")); }