diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 76d1fcf887..57bf6336d8 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,6 +25,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -137,6 +139,7 @@ public enum CommandType { private DelayQueue commandQueue; private Function relativeToAbsoluteTimestamp; private AuditCommandParser commandParser; + private ScheduledThreadPoolExecutor progressExecutor; @Override public Class getInputFormat(Configuration conf) { @@ -167,7 +170,7 @@ public boolean verifyConfigurations(Configuration conf) { } @Override - public void setup(Mapper.Context context) throws IOException { + public void setup(final Mapper.Context context) throws IOException { Configuration conf = context.getConfiguration(); // WorkloadDriver ensures that the starttimestamp is set startTimestampMs = conf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1); @@ -189,6 +192,16 @@ public Long apply(Long input) { LOG.info("Starting " + numThreads + " threads"); + progressExecutor = new ScheduledThreadPoolExecutor(1); + // half of the timeout or once per minute if none specified + long progressFrequencyMs = conf.getLong(MRJobConfig.TASK_TIMEOUT, 2 * 60 * 1000) / 2; + progressExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + context.progress(); + } + }, progressFrequencyMs, progressFrequencyMs, TimeUnit.MILLISECONDS); + threads = new ArrayList<>(); ConcurrentMap fsCache = new ConcurrentHashMap<>(); commandQueue = new DelayQueue<>(); @@ -226,6 +239,7 @@ public void cleanup(Mapper.Context context) throws InterruptedException { threadException = Optional.of(t.getException()); } } + progressExecutor.shutdown(); if (threadException.isPresent()) { throw new RuntimeException("Exception in AuditReplayThread", threadException.get());