diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 0aee62e8b3..ab3362b137 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -13,6 +13,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -121,6 +122,7 @@ public enum CommandType { private int numThreads; private long highestTimestamp; private List threads; + private DelayQueue commandQueue; private Function relativeToAbsoluteTimestamp; private AuditCommandParser commandParser; @@ -172,8 +174,9 @@ public Long apply(Long input) { LOG.info("Starting " + numThreads + " threads"); threads = new ArrayList<>(); + commandQueue = new DelayQueue<>(); for (int i = 0; i < numThreads; i++) { - AuditReplayThread thread = new AuditReplayThread(context); + AuditReplayThread thread = new AuditReplayThread(context, commandQueue); threads.add(thread); thread.start(); } @@ -188,11 +191,7 @@ public void map(LongWritable lineNum, Text inputLine, Mapper.Context context) if (delay > MAX_READAHEAD_MS) { Thread.sleep(delay - (MAX_READAHEAD_MS / 2)); } - int idx = cmd.getSrc().hashCode() % numThreads; - if (idx < 0) { - idx += numThreads; - } - threads.get(idx).addToQueue(cmd); + commandQueue.put(cmd); highestTimestamp = cmd.getAbsoluteTimestamp(); } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java index c007ff160c..59be1bac53 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java @@ -61,8 +61,9 @@ public class AuditReplayThread extends Thread { private Map replayCountersMap = new HashMap<>(); private Map individualCommandsMap = new HashMap<>(); - AuditReplayThread(Mapper.Context mapperContext) throws IOException { - commandQueue = new DelayQueue<>(); + AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue) + throws IOException { + commandQueue = queue; Configuration mapperConf = mapperContext.getConfiguration(); String namenodeURI = mapperConf.get(WorkloadDriver.NN_URI); startTimestampMs = mapperConf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1);