diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java
index 17ce3cb339..b6db85ed4e 100644
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/CreateFileMapper.java
@@ -4,27 +4,27 @@
*/
package com.linkedin.dynamometer.workloadgenerator;
-import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
CreateFileMapper continuously creates 1 byte files for the specified duration to increase the
- * number of file objects on the NN.
+ * number of file objects on the NN. This uses {@link TimedInputFormat}; see its Javadoc for configuration
+ * information.
*
*
Configuration options available:
*
- * - {@value NUM_MAPPERS_KEY} (required): Number of mappers to launch.
- * - {@value DURATION_MIN_KEY} (required): Number of minutes to induce workload for.
* - {@value SHOULD_DELETE_KEY} (default: {@value SHOULD_DELETE_DEFAULT}): If true, delete the files
* after creating them. This can be useful for generating constant load without increasing the
* number of file objects.
@@ -32,10 +32,8 @@
* in which to create files.
*
*/
-public class CreateFileMapper extends WorkloadMapper {
+public class CreateFileMapper extends WorkloadMapper {
- public static final String NUM_MAPPERS_KEY = "createfile.num-mappers";
- public static final String DURATION_MIN_KEY = "createfile.duration-min";
public static final String FILE_PARENT_PATH_KEY = "createfile.file-parent-path";
public static final String FILE_PARENT_PATH_DEFAULT = "/tmp/createFileMapper";
public static final String SHOULD_DELETE_KEY = "createfile.should-delete";
@@ -45,13 +43,12 @@ public enum CREATEFILECOUNTERS {
NUMFILESCREATED
}
- private long startTimestampMs;
+ private static final Logger LOG = LoggerFactory.getLogger(CreateFileMapper.class);
+ private static final byte[] FILE_CONTENT = { 0x0 };
+
private FileSystem fs;
- private Configuration conf;
- private int taskID;
- private String fileParentPath;
+ private Path filePrefix;
private boolean shouldDelete;
- private long endTimeStampMs;
@Override
public String getDescription() {
@@ -60,64 +57,41 @@ public String getDescription() {
@Override
public List getConfigDescriptions() {
- return Lists.newArrayList(
- NUM_MAPPERS_KEY + " (required): Number of mappers to launch.",
- DURATION_MIN_KEY + " (required): Number of minutes to induce workload for.",
- SHOULD_DELETE_KEY + " (default: " + SHOULD_DELETE_DEFAULT + "): If true, delete the files after creating " +
- "them. This can be useful for generating constant load without increasing the number of file objects.",
- FILE_PARENT_PATH_KEY + " (default: " + FILE_PARENT_PATH_DEFAULT +
- "): The root directory in which to create files."
- );
+ List baseList = TimedInputFormat.getConfigDescriptions();
+ baseList.add(SHOULD_DELETE_KEY + " (default: " + SHOULD_DELETE_DEFAULT +
+ "): If true, delete the files after creating them. This can be useful for generating constant load " +
+ "without increasing the number of file objects.");
+ baseList.add(FILE_PARENT_PATH_KEY + " (default: " + FILE_PARENT_PATH_DEFAULT +
+ "): The root directory in which to create files.");
+ return baseList;
}
@Override
public boolean verifyConfigurations(Configuration conf) {
- return conf.get(NUM_MAPPERS_KEY) != null && conf.get(DURATION_MIN_KEY) != null;
+ return TimedInputFormat.verifyConfigurations(conf);
}
@Override
- public void map(NullWritable key, NullWritable value, Mapper.Context mapperContext)
- throws IOException, InterruptedException {
- taskID = mapperContext.getTaskAttemptID().getTaskID().getId();
- conf = mapperContext.getConfiguration();
+ public void setup(Mapper.Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
String namenodeURI = conf.get(WorkloadDriver.NN_URI);
- startTimestampMs = conf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1);
- fileParentPath = conf.get(FILE_PARENT_PATH_KEY, FILE_PARENT_PATH_DEFAULT);
- shouldDelete = conf.getBoolean(SHOULD_DELETE_KEY, SHOULD_DELETE_DEFAULT);
- int durationMin = conf.getInt(DURATION_MIN_KEY, -1);
- if (durationMin < 0) {
- throw new IOException("Duration must be positive; got: " + durationMin);
- }
- endTimeStampMs = startTimestampMs + TimeUnit.MILLISECONDS.convert(durationMin, TimeUnit.MINUTES);
fs = FileSystem.get(URI.create(namenodeURI), conf);
- System.out.println("Start timestamp: " + startTimestampMs);
-
- long currentEpoch = System.currentTimeMillis();
- long delay = startTimestampMs - currentEpoch;
- if (delay > 0) {
- System.out.println("Sleeping for " + delay + " ms");
- Thread.sleep(delay);
- }
+ shouldDelete = conf.getBoolean(SHOULD_DELETE_KEY, SHOULD_DELETE_DEFAULT);
+ String fileParentPath = conf.get(FILE_PARENT_PATH_KEY, FILE_PARENT_PATH_DEFAULT);
+ int taskID = context.getTaskAttemptID().getTaskID().getId();
+ filePrefix = new Path(String.format("%s/mapper%s/file", fileParentPath, taskID));
+ LOG.info("Mapper path prefix: " + filePrefix);
+ }
- String mapperSpecifcPathPrefix = fileParentPath + "/mapper" + taskID;
- System.out.println("Mapper path prefix: " + mapperSpecifcPathPrefix);
- long numFilesCreated = 0;
- Path path;
- final byte[] content = {0x0};
- while (System.currentTimeMillis() < endTimeStampMs ) {
- path = new Path(mapperSpecifcPathPrefix + "/file" + numFilesCreated);
- OutputStream out = fs.create(path);
- out.write(content);
- out.close();
- numFilesCreated++;
- mapperContext.getCounter(CREATEFILECOUNTERS.NUMFILESCREATED).increment(1L);
- if (numFilesCreated % 1000 == 0) {
- mapperContext.progress();
- System.out.println("Number of files created: " + numFilesCreated);
- }
- if (shouldDelete) {
- fs.delete(path);
- }
+ @Override
+ public void map(LongWritable key, NullWritable value, Mapper.Context mapperContext) throws IOException {
+ Path path = filePrefix.suffix(key.get() + "");
+ OutputStream out = fs.create(path);
+ out.write(FILE_CONTENT);
+ out.close();
+ mapperContext.getCounter(CREATEFILECOUNTERS.NUMFILESCREATED).increment(1L);
+ if (shouldDelete) {
+ fs.delete(path, true);
}
}
}
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/TimedInputFormat.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/TimedInputFormat.java
new file mode 100644
index 0000000000..147c57e5f7
--- /dev/null
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/TimedInputFormat.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
+ * See LICENSE in the project root for license information.
+ */
+package com.linkedin.dynamometer.workloadgenerator;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+/**
+ * An {@link InputFormat} which is time-based. Starts at some timestamp (specified by the
+ * {@value WorkloadDriver#START_TIMESTAMP_MS configuration) and runs for a time specified by the
+ * {@value DURATION_KEY} configuration. Spawns {@value NUM_MAPPERS_KEY} mappers. Both {@value DURATION_KEY}
+ * and {@value NUM_MAPPERS_KEY} are required.
+ *
+ * The values returned as the key by this InputFormat are just a sequential counter.
+ */
+public class TimedInputFormat extends InputFormat {
+
+ public static final String DURATION_KEY = "timedinput.duration";
+ public static final String NUM_MAPPERS_KEY = "timedinput.num-mappers";
+
+ // Number of splits = Number of mappers. Creates fakeSplits to launch
+ // the required number of mappers
+ @Override
+ public List getSplits(JobContext job) {
+ Configuration conf = job.getConfiguration();
+ int numMappers = conf.getInt(NUM_MAPPERS_KEY, -1);
+ List splits = new ArrayList<>();
+ for (int i = 0; i < numMappers; i++) {
+ splits.add(new VirtualInputSplit());
+ }
+ return splits;
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
+ return new TimedRecordReader();
+ }
+
+ public static List getConfigDescriptions() {
+ return Lists.newArrayList(
+ NUM_MAPPERS_KEY + " (required): Number of mappers to launch.",
+ DURATION_KEY + " (required): Number of minutes to induce workload for."
+ );
+ }
+
+ public static boolean verifyConfigurations(Configuration conf) {
+ return conf.getInt(NUM_MAPPERS_KEY, -1) != -1
+ && conf.getTimeDuration(DURATION_KEY, -1, TimeUnit.MILLISECONDS) != -1;
+ }
+
+}
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/TimedRecordReader.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/TimedRecordReader.java
new file mode 100644
index 0000000000..673ffd6361
--- /dev/null
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/TimedRecordReader.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
+ * See LICENSE in the project root for license information.
+ */
+package com.linkedin.dynamometer.workloadgenerator;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.linkedin.dynamometer.workloadgenerator.TimedInputFormat.DURATION_KEY;
+
+
+/**
+ * A {@link RecordReader} to support {@link TimedInputFormat}. Waits to emit the first
+ * value until the start time has been reached, then continually emits values
+ * (a sequential counter) until the end of the duration has been reached.
+ */
+public class TimedRecordReader extends RecordReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimedRecordReader.class);
+
+ private LongWritable nextValue;
+ private long durationMs;
+ private long startTimestampMs;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ Configuration conf = context.getConfiguration();
+ durationMs = conf.getTimeDuration(DURATION_KEY, -1, TimeUnit.MILLISECONDS);
+ startTimestampMs = conf.getLong(WorkloadDriver.START_TIMESTAMP_MS, 0);
+ nextValue = new LongWritable(0);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws InterruptedException {
+ long elapsed = System.currentTimeMillis() - startTimestampMs;
+ LOG.info("Starting at " + startTimestampMs + " ms");
+ if (elapsed < 0) {
+ LOG.info("Sleeping for " + (-1 * elapsed) + " ms");
+ Thread.sleep(-1 * elapsed);
+ } else if (elapsed > durationMs) {
+ return false;
+ }
+ nextValue.set(nextValue.get() + 1L);
+ return true;
+ }
+
+ @Override
+ public LongWritable getCurrentKey() {
+ return nextValue;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public float getProgress() {
+ return (System.currentTimeMillis() - startTimestampMs) / ((float) durationMs);
+ }
+
+ @Override
+ public void close() {
+ // do Nothing
+ }
+}
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualInputFormat.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualInputFormat.java
deleted file mode 100644
index b9dbc75370..0000000000
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualInputFormat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
- * See LICENSE in the project root for license information.
- */
-package com.linkedin.dynamometer.workloadgenerator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class VirtualInputFormat extends FileInputFormat
-{
- // Number of splits = Number of mappers. Creates fakeSplits to launch
- // the required number of mappers
- @Override
- public List getSplits(JobContext job) throws IOException
- {
- Configuration conf = job.getConfiguration();
- int numMappers = conf.getInt(CreateFileMapper.NUM_MAPPERS_KEY, -1);
- if (numMappers == -1)
- throw new IOException("Number of mappers should be provided as input");
- List splits = new ArrayList(numMappers);
- for (int i = 0; i < numMappers; i++)
- splits.add(new VirtualInputSplit());
- return splits;
- }
-
- @Override
- public RecordReader createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException
- {
- return new VirtualRecordReader();
- }
-}
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualInputSplit.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualInputSplit.java
index 1648968a4d..b526053fc4 100644
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualInputSplit.java
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualInputSplit.java
@@ -11,7 +11,7 @@
import org.apache.hadoop.mapreduce.InputSplit;
-public class VirtualInputSplit extends InputSplit implements Writable {
+public class VirtualInputSplit extends InputSplit implements Writable {
@Override
public void write(DataOutput out) throws IOException {
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualRecordReader.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualRecordReader.java
deleted file mode 100644
index d91473a39d..0000000000
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/VirtualRecordReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
- * See LICENSE in the project root for license information.
- */
-package com.linkedin.dynamometer.workloadgenerator;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-
-public class VirtualRecordReader extends RecordReader {
- int durationMs;
- long startTimestampInMs;
- long endTimestampInMs;
- static int numRows = 1;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- durationMs = conf.getInt(CreateFileMapper.DURATION_MIN_KEY, 0) * 60 * 1000;
- startTimestampInMs = conf.getInt(WorkloadDriver.START_TIMESTAMP_MS, 0);
- endTimestampInMs = startTimestampInMs + durationMs;
- }
-
- // The map function per split should be invoked only once.
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (numRows > 0) {
- numRows--;
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public K getCurrentKey() throws IOException, InterruptedException {
- return (K) NullWritable.get();
- }
-
- @Override
- public V getCurrentValue() throws IOException, InterruptedException {
- return (V) NullWritable.get();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- long remainingMs = endTimestampInMs - System.currentTimeMillis();
- float progress = (remainingMs * 100) / durationMs;
- return progress;
- }
-
- @Override
- public void close() throws IOException {
- // do Nothing
- }
-};
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java
index e196e7c82b..0e4a651f0f 100644
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/WorkloadMapper.java
@@ -13,7 +13,7 @@
/**
* Represents the base class for a generic workload-generating mapper. By default, it will expect to use
- * {@link VirtualInputFormat} as its {@link InputFormat}. Subclasses expecting a different {@link InputFormat}
+ * {@link TimedInputFormat} as its {@link InputFormat}. Subclasses expecting a different {@link InputFormat}
* should override the {@link #getInputFormat(Configuration)} method.
*/
public abstract class WorkloadMapper extends Mapper {
@@ -22,7 +22,7 @@ public abstract class WorkloadMapper extends Mapper getInputFormat(Configuration conf) {
- return VirtualInputFormat.class;
+ return TimedInputFormat.class;
}
/**