From b7f879f1e84d1f19c24db477f7a353e1ce2a9aa0 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Mar 2019 11:07:21 -0700 Subject: [PATCH 1/5] Modify AuditReplay workflow to output the number of ops and their cumulative time --- .../audit/AuditReplayMapper.java | 6 +- .../audit/AuditReplayReducer.java | 15 ++-- .../audit/AuditReplayThread.java | 14 ++-- .../audit/CountTimeWritable.java | 71 +++++++++++++++++++ .../audit/UserCommandKey.java | 26 +++++-- .../TestWorkloadGenerator.java | 9 ++- 6 files changed, 115 insertions(+), 26 deletions(-) create mode 100644 dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index b2487a685a..414cb5be77 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -58,7 +58,7 @@ * replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate * factor of 0.5 would make it occur half as fast. */ -public class AuditReplayMapper extends WorkloadMapper { +public class AuditReplayMapper extends WorkloadMapper { public static final String INPUT_PATH_KEY = "auditreplay.input-path"; public static final String OUTPUT_PATH_KEY = "auditreplay.output-path"; @@ -257,13 +257,13 @@ public void cleanup(Mapper.Context context) throws InterruptedException, IOExcep @Override public void configureJob(Job job) { job.setMapOutputKeyClass(UserCommandKey.class); - job.setMapOutputValueClass(LongWritable.class); + job.setMapOutputValueClass(CountTimeWritable.class); job.setInputFormatClass(NoSplitTextInputFormat.class); job.setNumReduceTasks(1); job.setReducerClass(AuditReplayReducer.class); job.setOutputKeyClass(UserCommandKey.class); - job.setOutputValueClass(LongWritable.class); + job.setOutputValueClass(CountTimeWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY))); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index 27e2775f44..cfe4d8fa41 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -4,7 +4,6 @@ */ package com.linkedin.dynamometer.workloadgenerator.audit; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; @@ -16,14 +15,16 @@ * of the command (READ/WRITE). */ public class AuditReplayReducer extends - Reducer { + Reducer { @Override - protected void reduce(UserCommandKey key, Iterable values, Context context) throws IOException, InterruptedException { - long sum = 0; - for (LongWritable v : values) { - sum += v.get(); + protected void reduce(UserCommandKey key, Iterable values, Context context) throws IOException, InterruptedException { + long countSum = 0; + long timeSum = 0; + for (CountTimeWritable v : values) { + countSum += v.getCount(); + timeSum += v.getTime(); } - context.write(key, new LongWritable(sum)); + context.write(key, new CountTimeWritable(countSum, timeSum)); } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index 80b2b62768..beb90fa76e 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.counters.GenericCounter; @@ -64,7 +63,7 @@ public class AuditReplayThread extends Thread { // and merge them all together at the end. private Map replayCountersMap = new HashMap<>(); private Map individualCommandsMap = new HashMap<>(); - private Map commandLatencyMap = new HashMap<>(); + private Map commandLatencyMap = new HashMap<>(); AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue, ConcurrentMap fsCache) throws IOException { @@ -102,7 +101,7 @@ void drainCounters(Mapper.Context context) { } void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException { - for (Map.Entry ent : commandLatencyMap.entrySet()) { + for (Map.Entry ent : commandLatencyMap.entrySet()) { context.write(ent.getKey(), ent.getValue()); } } @@ -265,10 +264,11 @@ public FileSystem run() { long latency = System.currentTimeMillis() - startTime; - UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.getType().toString()); - commandLatencyMap.putIfAbsent(userCommandKey, new LongWritable(0)); - LongWritable latencyWritable = commandLatencyMap.get(userCommandKey); - latencyWritable.set(latencyWritable.get() + latency); + UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.toString(), replayCommand.getType().toString()); + commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable()); + CountTimeWritable latencyWritable = commandLatencyMap.get(userCommandKey); + latencyWritable.setCount(latencyWritable.getCount() + 1); + latencyWritable.setTime(latencyWritable.getTime() + latency); switch (replayCommand.getType()) { case WRITE: diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java new file mode 100644 index 0000000000..5a3b5316a5 --- /dev/null +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer.workloadgenerator.audit; + + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * UserCommandKey is a {@link Writable} used as a composite value that accumulates the count + * and cumulative latency of replayed commands. It is used as the output value for + * AuditReplayMapper and AuditReplayReducer. + */ +public class CountTimeWritable implements Writable { + private LongWritable count; + private LongWritable time; + + public CountTimeWritable() { + count = new LongWritable(); + time = new LongWritable(); + } + + public CountTimeWritable(LongWritable count, LongWritable time) { + this.count = count; + this.time = time; + } + + public CountTimeWritable(long count, long time) { + this.count = new LongWritable(count); + this.time = new LongWritable(time); + } + + public long getCount() { + return count.get(); + } + + public long getTime() { + return time.get(); + } + + public void setCount(long count) { + this.count.set(getCount() + count); + } + + public void setTime(long time) { + this.time.set(getTime() + time); + } + + @Override + public void write(DataOutput out) throws IOException { + count.write(out); + time.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + count.readFields(in); + time.readFields(in); + } + + @Override + public String toString() { + return getCount() + "," + getTime(); + } +} diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java index 2c4c922263..afa8af3fd2 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/UserCommandKey.java @@ -13,27 +13,31 @@ import org.apache.hadoop.io.WritableComparable; /** - * UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id and - * type of a replayed command. It is used as the output key for AuditReplayMapper and the + * UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id, name, + * and type of a replayed command. It is used as the output key for AuditReplayMapper and the * keys for AuditReplayReducer. */ public class UserCommandKey implements WritableComparable { private Text user; private Text command; + private Text type; public UserCommandKey() { user = new Text(); command = new Text(); + type = new Text(); } - public UserCommandKey(Text user, Text command) { + public UserCommandKey(Text user, Text command, Text type) { this.user = user; this.command = command; + this.type = type; } - public UserCommandKey(String user, String command) { + public UserCommandKey(String user, String command, String type) { this.user = new Text(user); this.command = new Text(command); + this.type = new Text(type); } public String getUser() { @@ -43,17 +47,23 @@ public String getUser() { public String getCommand() { return command.toString(); } + + public String getType() { + return type.toString(); + } @Override public void write(DataOutput out) throws IOException { user.write(out); command.write(out); + type.write(out); } @Override public void readFields(DataInput in) throws IOException { user.readFields(in); command.readFields(in); + type.readFields(in); } @Override @@ -63,7 +73,7 @@ public int compareTo(@Nonnull Object o) { @Override public String toString() { - return getUser() + "," + getCommand(); + return getUser() + "," + getType() + "," + getCommand(); } @Override @@ -75,11 +85,13 @@ public boolean equals(Object o) { return false; } UserCommandKey that = (UserCommandKey) o; - return getUser().equals(that.getUser()) && getCommand().equals(that.getCommand()); + return getUser().equals(that.getUser()) && + getCommand().equals(that.getCommand()) && + getType().equals(that.getType()); } @Override public int hashCode() { - return Objects.hash(getUser(), getCommand()); + return Objects.hash(getUser(), getCommand(), getType()); } } diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index a2c891aff4..a51756bee8 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -24,6 +24,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ImpersonationProvider; +import org.apache.htrace.commons.logging.Log; +import org.apache.htrace.commons.logging.LogFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,7 +34,8 @@ public class TestWorkloadGenerator { - + private static final Log LOG = LogFactory.getLog(TestWorkloadGenerator.class); + private Configuration conf; private MiniDFSCluster miniCluster; private FileSystem dfs; @@ -114,7 +117,9 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio assertTrue(dfs.exists(new Path(auditOutputPath))); try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, "part-r-00000"))) { String auditOutput = IOUtils.toString(auditOutputFile); - assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*")); + LOG.info(auditOutput); + assertTrue(auditOutput.matches(".*(hdfs,WRITE,[A-Z]+\\t[17]+,[0-9]+\\n){3}.*")); + // Matches three lines of the format "hdfs,WRITE count,time" } } } From a0ec676f33711b069124e5b5edf1d2948da273de Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Mar 2019 12:18:49 -0700 Subject: [PATCH 2/5] Fix spacing --- .../audit/AuditReplayMapper.java | 1 + .../audit/CountTimeWritable.java | 98 +++++++++---------- 2 files changed, 50 insertions(+), 49 deletions(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 414cb5be77..73a1b97dce 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -267,5 +267,6 @@ public void configureJob(Job job) { job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY))); + TextOutputFormat.SEPERATOR = ","; } } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java index 5a3b5316a5..28d37d6bfc 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/CountTimeWritable.java @@ -4,7 +4,6 @@ */ package com.linkedin.dynamometer.workloadgenerator.audit; - import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; @@ -12,60 +11,61 @@ import java.io.DataOutput; import java.io.IOException; + /** * UserCommandKey is a {@link Writable} used as a composite value that accumulates the count * and cumulative latency of replayed commands. It is used as the output value for * AuditReplayMapper and AuditReplayReducer. */ public class CountTimeWritable implements Writable { - private LongWritable count; - private LongWritable time; - - public CountTimeWritable() { - count = new LongWritable(); - time = new LongWritable(); - } - - public CountTimeWritable(LongWritable count, LongWritable time) { - this.count = count; - this.time = time; - } - - public CountTimeWritable(long count, long time) { - this.count = new LongWritable(count); - this.time = new LongWritable(time); - } - - public long getCount() { - return count.get(); - } - - public long getTime() { - return time.get(); - } - - public void setCount(long count) { - this.count.set(getCount() + count); - } + private LongWritable count; + private LongWritable time; + + public CountTimeWritable() { + count = new LongWritable(); + time = new LongWritable(); + } + + public CountTimeWritable(LongWritable count, LongWritable time) { + this.count = count; + this.time = time; + } + + public CountTimeWritable(long count, long time) { + this.count = new LongWritable(count); + this.time = new LongWritable(time); + } + + public long getCount() { + return count.get(); + } + + public long getTime() { + return time.get(); + } + + public void setCount(long count) { + this.count.set(getCount() + count); + } + + public void setTime(long time) { + this.time.set(getTime() + time); + } + + @Override + public void write(DataOutput out) throws IOException { + count.write(out); + time.write(out); + } - public void setTime(long time) { - this.time.set(getTime() + time); - } - - @Override - public void write(DataOutput out) throws IOException { - count.write(out); - time.write(out); - } + @Override + public void readFields(DataInput in) throws IOException { + count.readFields(in); + time.readFields(in); + } - @Override - public void readFields(DataInput in) throws IOException { - count.readFields(in); - time.readFields(in); - } - - @Override - public String toString() { - return getCount() + "," + getTime(); - } + @Override + public String toString() { + return getCount() + "," + getTime(); + } } From 4aeec4f99408e0babff7492f6b45d21313395556 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Mar 2019 12:24:03 -0700 Subject: [PATCH 3/5] Change separator to comma --- .../dynamometer/workloadgenerator/audit/AuditReplayMapper.java | 2 +- .../dynamometer/workloadgenerator/TestWorkloadGenerator.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 73a1b97dce..2defe7a334 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -267,6 +267,6 @@ public void configureJob(Job job) { job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY))); - TextOutputFormat.SEPERATOR = ","; + job.getConfiguration().set(TextOutputFormat.SEPERATOR, ","); } } diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index a51756bee8..238409aff8 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -118,7 +118,7 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, "part-r-00000"))) { String auditOutput = IOUtils.toString(auditOutputFile); LOG.info(auditOutput); - assertTrue(auditOutput.matches(".*(hdfs,WRITE,[A-Z]+\\t[17]+,[0-9]+\\n){3}.*")); + assertTrue(auditOutput.matches(".*(hdfs,WRITE,[A-Z]+,[17]+,[0-9]+\\n){3}.*")); // Matches three lines of the format "hdfs,WRITE count,time" } } From 25e9d3ed66446554baa4a908158df82283220d35 Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Mar 2019 12:37:17 -0700 Subject: [PATCH 4/5] Style and comments --- .../workloadgenerator/audit/AuditReplayThread.java | 3 ++- .../dynamometer/workloadgenerator/TestWorkloadGenerator.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index beb90fa76e..39f8660198 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -264,7 +264,8 @@ public FileSystem run() { long latency = System.currentTimeMillis() - startTime; - UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.toString(), replayCommand.getType().toString()); + UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), + replayCommand.toString(), replayCommand.getType().toString()); commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable()); CountTimeWritable latencyWritable = commandLatencyMap.get(userCommandKey); latencyWritable.setCount(latencyWritable.getCount() + 1); diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java index 238409aff8..ae43ee39c6 100644 --- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java +++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java @@ -119,7 +119,9 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio String auditOutput = IOUtils.toString(auditOutputFile); LOG.info(auditOutput); assertTrue(auditOutput.matches(".*(hdfs,WRITE,[A-Z]+,[17]+,[0-9]+\\n){3}.*")); - // Matches three lines of the format "hdfs,WRITE count,time" + // Matches three lines of the format "hdfs,WRITE,name,count,time" + // Using [17] for the count group because each operation is run either + // 1 or 7 times but the output order isn't guaranteed } } } From eab2d6e262f02b634ab4ad530b52b78365eea9cf Mon Sep 17 00:00:00 2001 From: Christopher Gregorian Date: Thu, 14 Mar 2019 13:39:06 -0700 Subject: [PATCH 5/5] Fix long line --- .../workloadgenerator/audit/AuditReplayReducer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java index cfe4d8fa41..1822d06237 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayReducer.java @@ -18,7 +18,8 @@ public class AuditReplayReducer extends Reducer { @Override - protected void reduce(UserCommandKey key, Iterable values, Context context) throws IOException, InterruptedException { + protected void reduce(UserCommandKey key, Iterable values, Context context) + throws IOException, InterruptedException { long countSum = 0; long timeSum = 0; for (CountTimeWritable v : values) {