Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public class Client extends Configured implements Tool {
public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path";
public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper";
public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay";
public static final String WORKLOAD_RATE_FACTOR_ARG = "workload_rate_factor";
public static final String WORKLOAD_RATE_FACTOR_DEFAULT = "1.0";

private static final String START_SCRIPT_LOCATION =
Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString();
Expand Down Expand Up @@ -185,6 +187,7 @@ public class Client extends Configured implements Tool {
private int workloadThreadsPerMapper;
// The startup delay for the workload job.
private long workloadStartDelayMs;
private double workloadRateFactor = 0.0;

// Start time for client
private final long clientStartTime = System.currentTimeMillis();
Expand Down Expand Up @@ -276,6 +279,9 @@ public Client(String appMasterJar) {
"any of them start replaying. Workloads with more mappers may need a longer delay to get all of " +
"the containers allocated. Human-readable units accepted (e.g. 30s, 10m). " +
"(default " + WorkloadDriver.START_TIME_OFFSET_DEFAULT + ")");
opts.addOption(WORKLOAD_RATE_FACTOR_ARG, true,
"Rate factor (multiplicative speed factor) to apply to workload replay (Default " +
WORKLOAD_RATE_FACTOR_DEFAULT + ")");
}

/**
Expand Down Expand Up @@ -375,6 +381,8 @@ public boolean accept(Path path) {
workloadInputPath = cliParser.getOptionValue(WORKLOAD_INPUT_PATH_ARG);
workloadThreadsPerMapper = Integer.parseInt(cliParser.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG,
String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT)));
workloadRateFactor = Double.parseDouble(cliParser.getOptionValue(WORKLOAD_RATE_FACTOR_ARG,
WORKLOAD_RATE_FACTOR_DEFAULT));
String delayString = cliParser.getOptionValue(WORKLOAD_START_DELAY_ARG, WorkloadDriver.START_TIME_OFFSET_DEFAULT);
// Store a temporary config to leverage Configuration's time duration parsing.
getConf().set("___temp___", delayString);
Expand Down Expand Up @@ -817,6 +825,7 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) {
Configuration workloadConf = new Configuration(getConf());
workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper);
workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor);
workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(),
workloadStartTime, AuditReplayMapper.class);
workloadJob.submit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
* "invalid" (threw an exception), how many were "late" (replayed later than they should have been),
* and the latency (from client perspective) of each command. If there are a large number of "late"
* commands, you likely need to increase the number of threads used and/or the number of mappers.
*
* By default, commands will be replayed at the same rate as they were originally performed. However
* a rate factor can be specified via the {@value RATE_FACTOR_KEY} configuration; all of the (relative)
* timestamps will be divided by this rate factor, effectively changing the rate at which they are
* replayed. For example, a rate factor of 2 would make the replay occur twice as fast, and a rate
* factor of 0.5 would make it occur half as fast.
*/
public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {

Expand All @@ -50,6 +56,8 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
public static final int NUM_THREADS_DEFAULT = 1;
public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
public static final boolean CREATE_BLOCKS_DEFAULT = true;
public static final String RATE_FACTOR_KEY = "auditreplay.rate-factor";
public static final double RATE_FACTOR_DEFAULT = 1.0;
public static final String COMMAND_PARSER_KEY = "auditreplay.command-parser.class";
public static final Class<AuditLogDirectParser> COMMAND_PARSER_DEFAULT = AuditLogDirectParser.class;

Expand Down Expand Up @@ -119,6 +127,7 @@ public enum CommandType {

private long startTimestampMs;
private int numThreads;
private double rateFactor;
private long highestTimestamp;
private List<AuditReplayThread> threads;
private Function<Long, Long> relativeToAbsoluteTimestamp;
Expand All @@ -140,7 +149,10 @@ public List<String> getConfigDescriptions() {
INPUT_PATH_KEY + " (required): Path to directory containing input files.",
NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT + "): Number of threads to use per mapper for replay.",
CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT + "): Whether or not to create 1-byte blocks when " +
"performing `create` commands."
"performing `create` commands.",
RATE_FACTOR_KEY + " (default " + RATE_FACTOR_DEFAULT + "): Multiplicative speed at which to replay the audit " +
" log; e.g. a value of 2.0 would make the replay occur at twice the original speed. This can be useful " +
"to induce heavier loads."
);
}

Expand All @@ -155,6 +167,7 @@ public void setup(Mapper.Context context) throws IOException {
// WorkloadDriver ensures that the starttimestamp is set
startTimestampMs = conf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1);
numThreads = conf.getInt(NUM_THREADS_KEY, NUM_THREADS_DEFAULT);
rateFactor = conf.getDouble(RATE_FACTOR_KEY, RATE_FACTOR_DEFAULT);
try {
commandParser = conf.getClass(COMMAND_PARSER_KEY, COMMAND_PARSER_DEFAULT, AuditCommandParser.class)
.getConstructor().newInstance();
Expand All @@ -165,7 +178,7 @@ public void setup(Mapper.Context context) throws IOException {
relativeToAbsoluteTimestamp = new Function<Long, Long>() {
@Override
public Long apply(Long input) {
return startTimestampMs + input;
return startTimestampMs + Math.round(input / rateFactor);
}
};

Expand Down