diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 3b6c54cbd2..5fc8117855 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -131,6 +131,8 @@ public class Client extends Configured implements Tool { public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path"; public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper"; public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay"; + public static final String WORKLOAD_RATE_FACTOR_ARG = "workload_rate_factor"; + public static final String WORKLOAD_RATE_FACTOR_DEFAULT = "1.0"; private static final String START_SCRIPT_LOCATION = Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString(); @@ -185,6 +187,7 @@ public class Client extends Configured implements Tool { private int workloadThreadsPerMapper; // The startup delay for the workload job. private long workloadStartDelayMs; + private double workloadRateFactor = 0.0; // Start time for client private final long clientStartTime = System.currentTimeMillis(); @@ -276,6 +279,9 @@ public Client(String appMasterJar) { "any of them start replaying. Workloads with more mappers may need a longer delay to get all of " + "the containers allocated. Human-readable units accepted (e.g. 30s, 10m). " + "(default " + WorkloadDriver.START_TIME_OFFSET_DEFAULT + ")"); + opts.addOption(WORKLOAD_RATE_FACTOR_ARG, true, + "Rate factor (multiplicative speed factor) to apply to workload replay (Default " + + WORKLOAD_RATE_FACTOR_DEFAULT + ")"); } /** @@ -375,6 +381,8 @@ public boolean accept(Path path) { workloadInputPath = cliParser.getOptionValue(WORKLOAD_INPUT_PATH_ARG); workloadThreadsPerMapper = Integer.parseInt(cliParser.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG, String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT))); + workloadRateFactor = Double.parseDouble(cliParser.getOptionValue(WORKLOAD_RATE_FACTOR_ARG, + WORKLOAD_RATE_FACTOR_DEFAULT)); String delayString = cliParser.getOptionValue(WORKLOAD_START_DELAY_ARG, WorkloadDriver.START_TIME_OFFSET_DEFAULT); // Store a temporary config to leverage Configuration's time duration parsing. getConf().set("___temp___", delayString); @@ -817,6 +825,7 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { Configuration workloadConf = new Configuration(getConf()); workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper); + workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor); workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(), workloadStartTime, AuditReplayMapper.class); workloadJob.submit(); diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index 0aee62e8b3..1f8f17b99a 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -42,6 +42,12 @@ * "invalid" (threw an exception), how many were "late" (replayed later than they should have been), * and the latency (from client perspective) of each command. If there are a large number of "late" * commands, you likely need to increase the number of threads used and/or the number of mappers. + * + * By default, commands will be replayed at the same rate as they were originally performed. However + * a rate factor can be specified via the {@value RATE_FACTOR_KEY} configuration; all of the (relative) + * timestamps will be divided by this rate factor, effectively changing the rate at which they are + * replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate + * factor of 0.5 would make it occur half as fast. */ public class AuditReplayMapper extends WorkloadMapper { @@ -50,6 +56,8 @@ public class AuditReplayMapper extends WorkloadMapper { public static final int NUM_THREADS_DEFAULT = 1; public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks"; public static final boolean CREATE_BLOCKS_DEFAULT = true; + public static final String RATE_FACTOR_KEY = "auditreplay.rate-factor"; + public static final double RATE_FACTOR_DEFAULT = 1.0; public static final String COMMAND_PARSER_KEY = "auditreplay.command-parser.class"; public static final Class COMMAND_PARSER_DEFAULT = AuditLogDirectParser.class; @@ -119,6 +127,7 @@ public enum CommandType { private long startTimestampMs; private int numThreads; + private double rateFactor; private long highestTimestamp; private List threads; private Function relativeToAbsoluteTimestamp; @@ -140,7 +149,10 @@ public List getConfigDescriptions() { INPUT_PATH_KEY + " (required): Path to directory containing input files.", NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.", CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT + "): Whether or not to create 1-byte blocks when " + - "performing `create` commands." + "performing `create` commands.", + RATE_FACTOR_KEY + " (default " + RATE_FACTOR_DEFAULT + "): Multiplicative speed at which to replay the audit " + + " log; e.g. a value of 2.0 would make the replay occur at twice the original speed. This can be useful " + + "to induce heavier loads." ); } @@ -155,6 +167,7 @@ public void setup(Mapper.Context context) throws IOException { // WorkloadDriver ensures that the starttimestamp is set startTimestampMs = conf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1); numThreads = conf.getInt(NUM_THREADS_KEY, NUM_THREADS_DEFAULT); + rateFactor = conf.getDouble(RATE_FACTOR_KEY, RATE_FACTOR_DEFAULT); try { commandParser = conf.getClass(COMMAND_PARSER_KEY, COMMAND_PARSER_DEFAULT, AuditCommandParser.class) .getConstructor().newInstance(); @@ -165,7 +178,7 @@ public void setup(Mapper.Context context) throws IOException { relativeToAbsoluteTimestamp = new Function() { @Override public Long apply(Long input) { - return startTimestampMs + input; + return startTimestampMs + Math.round(input / rateFactor); } };