Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate
* factor of 0.5 would make it occur half as fast.
*/
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text, UserCommandKey, LongWritable> {
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text, UserCommandKey, CountTimeWritable> {

public static final String INPUT_PATH_KEY = "auditreplay.input-path";
public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
Expand Down Expand Up @@ -257,15 +257,16 @@ public void cleanup(Mapper.Context context) throws InterruptedException, IOExcep
@Override
public void configureJob(Job job) {
job.setMapOutputKeyClass(UserCommandKey.class);
job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputValueClass(CountTimeWritable.class);
job.setInputFormatClass(NoSplitTextInputFormat.class);

job.setNumReduceTasks(1);
job.setReducerClass(AuditReplayReducer.class);
job.setOutputKeyClass(UserCommandKey.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputValueClass(CountTimeWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextOutputFormat.setOutputPath(job, new Path(job.getConfiguration().get(OUTPUT_PATH_KEY)));
job.getConfiguration().set(TextOutputFormat.SEPERATOR, ",");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package com.linkedin.dynamometer.workloadgenerator.audit;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
Expand All @@ -16,14 +15,17 @@
* of the command (READ/WRITE).
*/
public class AuditReplayReducer extends
Reducer<UserCommandKey, LongWritable, UserCommandKey, LongWritable> {
Reducer<UserCommandKey, CountTimeWritable, UserCommandKey, CountTimeWritable> {

@Override
protected void reduce(UserCommandKey key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable v : values) {
sum += v.get();
protected void reduce(UserCommandKey key, Iterable<CountTimeWritable> values, Context context)
throws IOException, InterruptedException {
long countSum = 0;
long timeSum = 0;
for (CountTimeWritable v : values) {
countSum += v.getCount();
timeSum += v.getTime();
}
context.write(key, new LongWritable(sum));
context.write(key, new CountTimeWritable(countSum, timeSum));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class AuditReplayThread extends Thread {
// and merge them all together at the end.
private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
private Map<String, Counter> individualCommandsMap = new HashMap<>();
private Map<UserCommandKey, LongWritable> commandLatencyMap = new HashMap<>();
private Map<UserCommandKey, CountTimeWritable> commandLatencyMap = new HashMap<>();

AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue,
ConcurrentMap<String, FileSystem> fsCache) throws IOException {
Expand Down Expand Up @@ -102,7 +101,7 @@ void drainCounters(Mapper.Context context) {
}

void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException {
for (Map.Entry<UserCommandKey, LongWritable> ent : commandLatencyMap.entrySet()) {
for (Map.Entry<UserCommandKey, CountTimeWritable> ent : commandLatencyMap.entrySet()) {
context.write(ent.getKey(), ent.getValue());
}
}
Expand Down Expand Up @@ -265,10 +264,12 @@ public FileSystem run() {

long latency = System.currentTimeMillis() - startTime;

UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.getType().toString());
commandLatencyMap.putIfAbsent(userCommandKey, new LongWritable(0));
LongWritable latencyWritable = commandLatencyMap.get(userCommandKey);
latencyWritable.set(latencyWritable.get() + latency);
UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(),
replayCommand.toString(), replayCommand.getType().toString());
commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable());
CountTimeWritable latencyWritable = commandLatencyMap.get(userCommandKey);
latencyWritable.setCount(latencyWritable.getCount() + 1);
latencyWritable.setTime(latencyWritable.getTime() + latency);

switch (replayCommand.getType()) {
case WRITE:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Comment thread
csgregorian marked this conversation as resolved.
* Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer.workloadgenerator.audit;

Comment thread
csgregorian marked this conversation as resolved.
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

Comment thread
csgregorian marked this conversation as resolved.

/**
* UserCommandKey is a {@link Writable} used as a composite value that accumulates the count
* and cumulative latency of replayed commands. It is used as the output value for
* AuditReplayMapper and AuditReplayReducer.
*/
public class CountTimeWritable implements Writable {
private LongWritable count;
private LongWritable time;

public CountTimeWritable() {
count = new LongWritable();
time = new LongWritable();
}

public CountTimeWritable(LongWritable count, LongWritable time) {
this.count = count;
this.time = time;
}

public CountTimeWritable(long count, long time) {
this.count = new LongWritable(count);
this.time = new LongWritable(time);
}

public long getCount() {
return count.get();
}

public long getTime() {
return time.get();
}

public void setCount(long count) {
this.count.set(getCount() + count);
}

public void setTime(long time) {
this.time.set(getTime() + time);
}

@Override
public void write(DataOutput out) throws IOException {
count.write(out);
time.write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
count.readFields(in);
time.readFields(in);
}

@Override
public String toString() {
return getCount() + "," + getTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,31 @@
import org.apache.hadoop.io.WritableComparable;

/**
* UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id and
* type of a replayed command. It is used as the output key for AuditReplayMapper and the
* UserCommandKey is a {@link WritableComparable} used as a composite key combining the user id, name,
* and type of a replayed command. It is used as the output key for AuditReplayMapper and the
* keys for AuditReplayReducer.
*/
public class UserCommandKey implements WritableComparable {
private Text user;
private Text command;
private Text type;

public UserCommandKey() {
user = new Text();
command = new Text();
type = new Text();
}

public UserCommandKey(Text user, Text command) {
public UserCommandKey(Text user, Text command, Text type) {
this.user = user;
this.command = command;
this.type = type;
}

public UserCommandKey(String user, String command) {
public UserCommandKey(String user, String command, String type) {
this.user = new Text(user);
this.command = new Text(command);
this.type = new Text(type);
}

public String getUser() {
Expand All @@ -43,17 +47,23 @@ public String getUser() {
public String getCommand() {
return command.toString();
}

public String getType() {
return type.toString();
}

@Override
public void write(DataOutput out) throws IOException {
user.write(out);
command.write(out);
type.write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
user.readFields(in);
command.readFields(in);
type.readFields(in);
}

@Override
Expand All @@ -63,7 +73,7 @@ public int compareTo(@Nonnull Object o) {

@Override
public String toString() {
return getUser() + "," + getCommand();
return getUser() + "," + getType() + "," + getCommand();
}

@Override
Expand All @@ -75,11 +85,13 @@ public boolean equals(Object o) {
return false;
}
UserCommandKey that = (UserCommandKey) o;
return getUser().equals(that.getUser()) && getCommand().equals(that.getCommand());
return getUser().equals(that.getUser()) &&
getCommand().equals(that.getCommand()) &&
getType().equals(that.getType());
}

@Override
public int hashCode() {
return Objects.hash(getUser(), getCommand());
return Objects.hash(getUser(), getCommand(), getType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ImpersonationProvider;
import org.apache.htrace.commons.logging.Log;
import org.apache.htrace.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -32,7 +34,8 @@


public class TestWorkloadGenerator {

private static final Log LOG = LogFactory.getLog(TestWorkloadGenerator.class);

private Configuration conf;
private MiniDFSCluster miniCluster;
private FileSystem dfs;
Expand Down Expand Up @@ -114,7 +117,11 @@ private void testAuditWorkloadWithOutput(String auditOutputPath) throws Exceptio
assertTrue(dfs.exists(new Path(auditOutputPath)));
try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath, "part-r-00000"))) {
String auditOutput = IOUtils.toString(auditOutputFile);
assertTrue(auditOutput.matches(".*hdfs,WRITE\\t[0-9]+\\n.*"));
LOG.info(auditOutput);
assertTrue(auditOutput.matches(".*(hdfs,WRITE,[A-Z]+,[17]+,[0-9]+\\n){3}.*"));
Comment thread
csgregorian marked this conversation as resolved.
// Matches three lines of the format "hdfs,WRITE,name,count,time"
// Using [17] for the count group because each operation is run either
// 1 or 7 times but the output order isn't guaranteed
}
}
}