diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index b2487a685a..2defe7a334 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -58,7 +58,7 @@ * replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate * factor of 0.5 would make it occur half as fast. */ -public class AuditReplayMapper extends WorkloadMapper { +public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; @@ -257,15 +257,16 @@ public void cleanup(Mapper.Context context) throws InterruptedException, IOExcep @Override public void configureJob(Job job) { job.setMapOutputKeyClass(UserCommandKey.class); - job.setMapOutputValueClass(LongWritable.class); + job.setMapOutputValueClass(CountTimeWritable.class); job.setInputFormatClass(NoSplitTextInputFormat.class); job.setNumReduceTasks(1); job.setReducerClass(AuditReplayReducer.class); job.setOutputKeyClass(UserCommandKey.class); - job.setOutputValueClass(LongWritable.class); + job.setOutputValueClass(CountTimeWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY))); + job.getConfiguration().set(TextOutputFormat.SEPERATOR, ","); } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index 27e2775f44..1822d06237 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -4,7 +4,6 @@ */ package com.linkedin.dynamometer.workloadgenerator.audit; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; @@ -16,14 +15,17 @@ * of the command (READ/WRITE). */ public class AuditReplayReducer extends - Reducer { + Reducer { @Override - protected void reduce(UserCommandKey key, Iterable values, Context context) throws IOException, InterruptedException { - long sum = 0; - for (LongWritable v : values) { - sum += v.get(); + protected void reduce(UserCommandKey key, Iterable values, Context context) + throws IOException, InterruptedException { + long countSum = 0; + long timeSum = 0; + for (CountTimeWritable v : values) { + countSum += v.getCount(); + timeSum += v.getTime(); } - context.write(key, new LongWritable(sum)); + context.write(key, new CountTimeWritable(countSum, timeSum)); } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 80b2b62768..39f8660198 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.counters.GenericCounter; @@ -64,7 +63,7 @@ public class AuditReplayThread extends Thread { // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); private Map individualCommandsMap = new HashMap<>(); - private Map commandLatencyMap = new HashMap<>(); + private Map commandLatencyMap = new HashMap<>(); AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue, ConcurrentMap fsCache) throws IOException { @@ -102,7 +101,7 @@ void drainCounters(Mapper.Context context) { } void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException { - for (Map.Entry ent : commandLatencyMap.entrySet()) { + for (Map.Entry ent : commandLatencyMap.entrySet()) { context.write(ent.getKey(), ent.getValue()); } } @@ -265,10 +264,12 @@ public FileSystem run() { long latency = System.currentTimeMillis() - startTime; - UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.getType().toString()); - commandLatencyMap.putIfAbsent(userCommandKey, new LongWritable(0)); - LongWritable latencyWritable = commandLatencyMap.get(userCommandKey); - latencyWritable.set(latencyWritable.get() + latency); + UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), + replayCommand.toString(), replayCommand.getType().toString()); + commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable()); + CountTimeWritable latencyWritable = commandLatencyMap.get(userCommandKey); + latencyWritable.setCount(latencyWritable.getCount() + 1); + latencyWritable.setTime(latencyWritable.getTime() + latency); switch (replayCommand.getType()) { case WRITE: diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java new file mode 100644 index 0000000000..28d37d6bfc --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.workloadgenerator.audit; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + + +/** + * UserCommandKey is a {@link Writable} used as a composite value that accumulates the count + * and cumulative latency of replayed commands. It is used as the output value for + * AuditReplayMapper and AuditReplayReducer. + */ +public class CountTimeWritable implements Writable { + private LongWritable count; + private LongWritable time; + + public CountTimeWritable() { + count = new LongWritable(); + time = new LongWritable(); + } + + public CountTimeWritable(LongWritable count, LongWritable time) { + this.count = count; + this.time = time; + } + + public CountTimeWritable(long count, long time) { + this.count = new LongWritable(count); + this.time = new LongWritable(time); + } + + public long getCount() { + return count.get(); + } + + public long getTime() { + return time.get(); + } + + public void setCount(long count) { + this.count.set(getCount() + count); + } + + public void setTime(long time) { + this.time.set(getTime() + time); + } + + @Override + public void write(DataOutput out) throws IOException { + count.write(out); + time.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + count.readFields(in); + time.readFields(in); + } + + @Override + public String toString() { + return getCount() + "," + getTime(); + } +} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java index 2c4c922263..afa8af3fd2 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -13,27 +13,31 @@ import org.apache.hadoop.io.WritableComparable; /** - * UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id and - * type of a replayed command. It is used as the output key for AuditReplayMapper and the + * UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id, name, + * and type of a replayed command. It is used as the output key for AuditReplayMapper and the * keys for AuditReplayReducer. */ public class UserCommandKey implements WritableComparable { private Text user; private Text command; + private Text type; public UserCommandKey() { user = new Text(); command = new Text(); + type = new Text(); } - public UserCommandKey(Text user, Text command) { + public UserCommandKey(Text user, Text command, Text type) { this.user = user; this.command = command; + this.type = type; } - public UserCommandKey(String user, String command) { + public UserCommandKey(String user, String command, String type) { this.user = new Text(user); this.command = new Text(command); + this.type = new Text(type); } public String getUser() { @@ -43,17 +47,23 @@ public String getUser() { public String getCommand() { return command.toString(); } + + public String getType() { + return type.toString(); + } @Override public void write(DataOutput out) throws IOException { user.write(out); command.write(out); + type.write(out); } @Override public void readFields(DataInput in) throws IOException { user.readFields(in); command.readFields(in); + type.readFields(in); } @Override @@ -63,7 +73,7 @@ public int compareTo(@Nonnull Object o) { @Override public String toString() { - return getUser() + "," + getCommand(); + return getUser() + "," + getType() + "," + getCommand(); } @Override @@ -75,11 +85,13 @@ public boolean equals(Object o) { return false; } UserCommandKey that = (UserCommandKey) o; - return getUser().equals(that.getUser()) && getCommand().equals(that.getCommand()); + return getUser().equals(that.getUser()) && + getCommand().equals(that.getCommand()) && + getType().equals(that.getType()); } @Override public int hashCode() { - return Objects.hash(getUser(), getCommand()); + return Objects.hash(getUser(), getCommand(), getType()); } } diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index a2c891aff4..ae43ee39c6 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -24,6 +24,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ImpersonationProvider; +import org.apache.htrace.commons.logging.Log; +import org.apache.htrace.commons.logging.LogFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,7 +34,8 @@ public class TestWorkloadGenerator { - + private static final Log LOG = LogFactory.getLog(TestWorkloadGenerator.class); + private Configuration conf; private MiniDFSCluster miniCluster; private FileSystem dfs; @@ -114,7 +117,11 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio assertTrue(dfs.exists(new Path(auditOutputPath))); try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, "part-r-00000"))) { String auditOutput = IOUtils.toString(auditOutputFile); - assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*")); + LOG.info(auditOutput); + assertTrue(auditOutput.matches(".*(hdfs,WRITE,[A-Z]+,[17]+,[0-9]+\\n){3}.*")); + // Matches three lines of the format "hdfs,WRITE,name,count,time" + // Using [17] for the count group because each operation is run either + // 1 or 7 times but the output order isn't guaranteed } } }