diff --git a/.gitignore b/.gitignore index 4944af77e4..20f639de91 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ scripts.tar *.iml .idea/ gradle.out +*/out diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index f41280f0ba..770fb354bb 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -145,6 +145,7 @@ public class Client extends Configured implements Tool { public static final String TOKEN_FILE_LOCATION_ARG = "token_file_location"; public static final String WORKLOAD_REPLAY_ENABLE_ARG = "workload_replay_enable"; public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path"; + public static final String WORKLOAD_OUTPUT_PATH_ARG = "workload_output_path"; public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper"; public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay"; public static final String WORKLOAD_RATE_FACTOR_ARG = "workload_rate_factor"; @@ -205,6 +206,8 @@ public class Client extends Configured implements Tool { private volatile Job workloadJob; // The input path for the workload job. private String workloadInputPath = ""; + // The output path for the workload job metric results. + private String workloadOutputPath = ""; // The number of threads to use per mapper for the workload job. private int workloadThreadsPerMapper; // The startup delay for the workload job. @@ -299,6 +302,7 @@ public Client(String... dependencyJars) { opts.addOption(WORKLOAD_REPLAY_ENABLE_ARG, false, "If specified, this client will additionally launch the workload " + "replay job to replay audit logs against the HDFS cluster which is started."); opts.addOption(WORKLOAD_INPUT_PATH_ARG, true, "Location of the audit traces to replay (Required for workload)"); + opts.addOption(WORKLOAD_OUTPUT_PATH_ARG, true, "Location of the metrics output (Required for workload)"); opts.addOption(WORKLOAD_THREADS_PER_MAPPER_ARG, true, "Number of threads per mapper to use to " + "replay the workload. (default " + AuditReplayMapper.NUM_THREADS_DEFAULT + ")"); opts.addOption(WORKLOAD_START_DELAY_ARG, true, "Delay between launching the Workload MR job and " + @@ -409,6 +413,7 @@ public boolean accept(Path path) { } launchWorkloadJob = true; workloadInputPath = cliParser.getOptionValue(WORKLOAD_INPUT_PATH_ARG); + workloadOutputPath = cliParser.getOptionValue(WORKLOAD_OUTPUT_PATH_ARG); workloadThreadsPerMapper = Integer.parseInt(cliParser.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG, String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT))); workloadRateFactor = Double.parseDouble(cliParser.getOptionValue(WORKLOAD_RATE_FACTOR_ARG, @@ -912,6 +917,7 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { long workloadStartTime = System.currentTimeMillis() + workloadStartDelayMs; Configuration workloadConf = new Configuration(getConf()); workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + workloadConf.set(AuditReplayMapper.OUTPUT_PATH_KEY, workloadOutputPath); workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper); workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor); for (Map.Entry configPair : workloadExtraConfigs.entrySet()) { diff --git a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java index 44a108dfb9..0daf55a045 100644 --- a/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java +++ b/dynamometer-infra/src/test/java/com/linkedin/dynamometer/TestDynamometerInfra.java @@ -107,6 +107,8 @@ public class TestDynamometerInfra { private static final String NAMENODE_NODELABEL = "dyno_namenode"; private static final String DATANODE_NODELABEL = "dyno_datanode"; + private static final String OUTPUT_PATH = "/tmp/trace_output_direct"; + private static MiniDFSCluster miniDFSCluster; private static MiniYARNCluster miniYARNCluster; private static YarnClient yarnClient; @@ -278,6 +280,7 @@ public void run() { "-" + AMOptions.SHELL_ENV_ARG, "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop", "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG, "-" + Client.WORKLOAD_INPUT_PATH_ARG, fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(), + "-" + Client.WORKLOAD_OUTPUT_PATH_ARG, fs.makeQualified(new Path(OUTPUT_PATH)).toString(), "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1", "-" + Client.WORKLOAD_START_DELAY_ARG, "10s", "-" + AMOptions.NAMENODE_ARGS_ARG, "-Ddfs.namenode.safemode.extension=0" @@ -402,6 +405,8 @@ public Boolean get() { } } }, 3000, 60000); + + assertTrue(fs.exists(new Path(OUTPUT_PATH))); } private static URI getResourcePath(String resourceName) { diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java index b6db85ed4e..4449ea2313 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java @@ -32,7 +32,7 @@ * in which to create files. * */ -public class CreateFileMapper extends WorkloadMapper { +public class CreateFileMapper extends WorkloadMapper { public static final String FILE_PARENT_PATH_KEY = "createfile.file-parent-path"; public static final String FILE_PARENT_PATH_DEFAULT = "/tmp/createFileMapper"; diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java index 38f237f572..461afaf4d1 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadDriver.java @@ -21,9 +21,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -120,16 +118,9 @@ public static Job getJobForSubmission(Configuration baseConf, String nnURI, long conf.setLong(START_TIMESTAMP_MS, startTimestampMs); Job job = Job.getInstance(conf, "Dynamometer Workload Driver"); - job.setOutputFormatClass(NullOutputFormat.class); job.setJarByClass(mapperClass); job.setMapperClass(mapperClass); - job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf)); - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(NullWritable.class); + mapperClass.newInstance().configureJob(job); return job; } @@ -153,7 +144,7 @@ private Class getMapperClass(String className) throws private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundException, InstantiationException, IllegalAccessException { - WorkloadMapper mapper = getMapperClass(mapperClassName).newInstance(); + WorkloadMapper mapper = getMapperClass(mapperClassName).newInstance(); StringBuilder builder = new StringBuilder("Usage for "); builder.append(mapper.getClass().getSimpleName()); builder.append(":\n"); @@ -167,5 +158,4 @@ private String getMapperUsageInfo(String mapperClassName) throws ClassNotFoundEx return builder.toString(); } - } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java index 0e4a651f0f..effd73761d 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java @@ -8,23 +8,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; /** * Represents the base class for a generic workload-generating mapper. By default, it will expect to use - * {@link TimedInputFormat} as its {@link InputFormat}. Subclasses expecting a different {@link InputFormat} - * should override the {@link #getInputFormat(Configuration)} method. + * {@link TimedInputFormat} as its {@link InputFormat}. Subclasses requiring a reducer or expecting + * a different {@link InputFormat} should override the {@link #configureJob(Job)} method. */ -public abstract class WorkloadMapper extends Mapper { - - /** - * Return the input class to be used by this mapper. - */ - public Class getInputFormat(Configuration conf) { - return TimedInputFormat.class; - } - +public abstract class WorkloadMapper extends Mapper { /** * Get the description of the behavior of this mapper. */ @@ -41,4 +35,15 @@ public Class getInputFormat(Configuration conf) { */ public abstract boolean verifyConfigurations(Configuration conf); + /** + * Setup input and output formats and optional reducer. + */ + public void configureJob(Job job) { + job.setInputFormatClass(TimedInputFormat.class); + + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 57bf6336d8..b2487a685a 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -18,16 +18,19 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.READ; import static com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper.CommandType.WRITE; @@ -55,9 +58,10 @@ * replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate * factor of 0.5 would make it occur half as fast. */ -public class AuditReplayMapper extends WorkloadMapper { +public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; + public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; public static final String NUM_THREADS_KEY = "auditreplay.num-threads"; public static final int NUM_THREADS_DEFAULT = 1; public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks"; @@ -141,11 +145,6 @@ public enum CommandType { private AuditCommandParser commandParser; private ScheduledThreadPoolExecutor progressExecutor; - @Override - public Class getInputFormat(Configuration conf) { - return NoSplitTextInputFormat.class; - } - @Override public String getDescription() { return "This mapper replays audit log files."; @@ -155,6 +154,7 @@ public String getDescription() { public List getConfigDescriptions() { return Lists.newArrayList( INPUT_PATH_KEY + " (required): Path to directory containing input files.", + OUTPUT_PATH_KEY + " (required): Path to destination for output files.", NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.", CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT + "): Whether or not to create 1-byte blocks when " + "performing `create` commands.", @@ -166,7 +166,7 @@ public List getConfigDescriptions() { @Override public boolean verifyConfigurations(Configuration conf) { - return conf.get(INPUT_PATH_KEY) != null; + return conf.get(INPUT_PATH_KEY) != null && conf.get(OUTPUT_PATH_KEY) != null; } @Override @@ -226,7 +226,7 @@ public void map(LongWritable lineNum, Text inputLine, Mapper.Context context) } @Override - public void cleanup(Mapper.Context context) throws InterruptedException { + public void cleanup(Mapper.Context context) throws InterruptedException, IOException { for (AuditReplayThread t : threads) { // Add in an indicator for each thread to shut down after the last real command t.addToQueue(AuditReplayCommand.getPoisonPill(highestTimestamp + 1)); @@ -235,6 +235,7 @@ public void cleanup(Mapper.Context context) throws InterruptedException { for (AuditReplayThread t : threads) { t.join(); t.drainCounters(context); + t.drainCommandLatencies(context); if (t.getException() != null) { threadException = Optional.of(t.getException()); } @@ -252,4 +253,19 @@ public void cleanup(Mapper.Context context) throws InterruptedException { LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps); } } + + @Override + public void configureJob(Job job) { + job.setMapOutputKeyClass(UserCommandKey.class); + job.setMapOutputValueClass(LongWritable.class); + job.setInputFormatClass(NoSplitTextInputFormat.class); + + job.setNumReduceTasks(1); + job.setReducerClass(AuditReplayReducer.class); + job.setOutputKeyClass(UserCommandKey.class); + job.setOutputValueClass(LongWritable.class); + job.setOutputFormatClass(TextOutputFormat.class); + + TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY))); + } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java new file mode 100644 index 0000000000..27e2775f44 --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -0,0 +1,29 @@ +/* + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.workloadgenerator.audit; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + + +/** + * AuditReplayReducer aggregates the returned latency values from {@link AuditReplayMapper} and sums + * them up by {@link UserCommandKey}, which combines the user's id that ran the command and the type + * of the command (READ/WRITE). + */ +public class AuditReplayReducer extends + Reducer { + + @Override + protected void reduce(UserCommandKey key, Iterable values, Context context) throws IOException, InterruptedException { + long sum = 0; + for (LongWritable v : values) { + sum += v.get(); + } + context.write(key, new LongWritable(sum)); + } +} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index db480b9e7b..80b2b62768 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.counters.GenericCounter; @@ -63,6 +64,7 @@ public class AuditReplayThread extends Thread { // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); private Map individualCommandsMap = new HashMap<>(); + private Map commandLatencyMap = new HashMap<>(); AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue, ConcurrentMap fsCache) throws IOException { @@ -99,6 +101,12 @@ void drainCounters(Mapper.Context context) { } } + void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException { + for (Map.Entry ent : commandLatencyMap.entrySet()) { + context.write(ent.getKey(), ent.getValue()); + } + } + /** * Add a command to this thread's processing queue. * @param cmd Command to add. @@ -254,7 +262,14 @@ public FileSystem run() { fs.concat(new Path(src), dsts.toArray(new Path[] {})); break; } + long latency = System.currentTimeMillis() - startTime; + + UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.getType().toString()); + commandLatencyMap.putIfAbsent(userCommandKey, new LongWritable(0)); + LongWritable latencyWritable = commandLatencyMap.get(userCommandKey); + latencyWritable.set(latencyWritable.get() + latency); + switch (replayCommand.getType()) { case WRITE: replayCountersMap.get(REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY).increment(latency); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java new file mode 100644 index 0000000000..7126bd83f4 --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -0,0 +1,85 @@ +/** + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.workloadgenerator.audit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; +import javax.annotation.Nonnull; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +/** + * UserCommandKey is a {@link Writable} used as a composite key combining the user id and + * type of a replayed command. It is used as the output key for AuditReplayMapper and the + * keys for AuditReplayReducer. + */ +public class UserCommandKey implements WritableComparable { + private Text user; + private Text command; + + public UserCommandKey() { + user = new Text(); + command = new Text(); + } + + public UserCommandKey(Text user, Text command) { + this.user = user; + this.command = command; + } + + public UserCommandKey(String user, String command) { + this.user = new Text(user); + this.command = new Text(command); + } + + public String getUser() { + return user.toString(); + } + + public String getCommand() { + return command.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + user.write(out); + command.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + user.readFields(in); + command.readFields(in); + } + + @Override + public int compareTo(@Nonnull Object o) { + return toString().compareTo(o.toString()); + } + + @Override + public String toString() { + return getUser() + "," + getCommand(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UserCommandKey that = (UserCommandKey) o; + return getUser().equals(that.getUser()) && getCommand().equals(that.getCommand()); + } + + @Override + public int hashCode() { + return Objects.hash(getUser(), getCommand()); + } +} diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index cbe6db05ce..a2c891aff4 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -9,9 +9,11 @@ import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; @@ -26,9 +28,7 @@ import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestWorkloadGenerator { @@ -58,20 +58,25 @@ public void tearDown() throws Exception { } @Test - public void testAuditWorkloadDirectParser() throws Exception { + public void testAuditWorkloadDirectParserWithOutput() throws Exception { String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_direct").toString(); + String auditOutputPath = "/tmp/trace_output_direct"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60*1000); - testAuditWorkload(); + testAuditWorkloadWithOutput(auditOutputPath); } @Test - public void testAuditWorkloadHiveParser() throws Exception { - String workloadInputPath = TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); + public void testAuditWorkloadHiveParserWithOutput() throws Exception { + String workloadInputPath = + TestWorkloadGenerator.class.getClassLoader().getResource("audit_trace_hive").toString(); + String auditOutputPath = "/tmp/trace_output_hive"; conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); + conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath); conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY, AuditLogHiveTableParser.class, AuditCommandParser.class); - testAuditWorkload(); + testAuditWorkloadWithOutput(auditOutputPath); } /** @@ -93,7 +98,7 @@ public void authorize(UserGroupInformation user, String remoteAddress) throws Au } } - private void testAuditWorkload() throws Exception { + private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exception { long workloadStartTime = System.currentTimeMillis() + 10000; Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(), workloadStartTime, AuditReplayMapper.class); @@ -105,5 +110,11 @@ private void testAuditWorkload() throws Exception { assertTrue(dfs.getFileStatus(new Path("/tmp/test1")).isFile()); assertTrue(dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory()); assertFalse(dfs.exists(new Path("/denied"))); + + assertTrue(dfs.exists(new Path(auditOutputPath))); + try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, "part-r-00000"))) { + String auditOutput = IOUtils.toString(auditOutputFile); + assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*")); + } } }