Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,36 @@
*/
package com.linkedin.dynamometer.workloadgenerator;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* <p>CreateFileMapper continuously creates 1 byte files for the specified duration to increase the
* number of file objects on the NN.
* number of file objects on the NN. This uses {@link TimedInputFormat}; see its Javadoc for configuration
* information.
*
* <p>Configuration options available:
* <ul>
* <li>{@value NUM_MAPPERS_KEY} (required): Number of mappers to launch.</li>
* <li>{@value DURATION_MIN_KEY} (required): Number of minutes to induce workload for.</li>
* <li>{@value SHOULD_DELETE_KEY} (default: {@value SHOULD_DELETE_DEFAULT}): If true, delete the files
* after creating them. This can be useful for generating constant load without increasing the
* number of file objects.</li>
* <li>{@value FILE_PARENT_PATH_KEY} (default: {@value FILE_PARENT_PATH_DEFAULT}): The root directory
* in which to create files.</li>
* </ul>
*/
public class CreateFileMapper extends WorkloadMapper<NullWritable, NullWritable> {
public class CreateFileMapper extends WorkloadMapper<LongWritable, NullWritable> {

public static final String NUM_MAPPERS_KEY = "createfile.num-mappers";
public static final String DURATION_MIN_KEY = "createfile.duration-min";
public static final String FILE_PARENT_PATH_KEY = "createfile.file-parent-path";
public static final String FILE_PARENT_PATH_DEFAULT = "/tmp/createFileMapper";
Comment thread
xkrogen marked this conversation as resolved.
public static final String SHOULD_DELETE_KEY = "createfile.should-delete";
Expand All @@ -45,13 +43,12 @@ public enum CREATEFILECOUNTERS {
NUMFILESCREATED
}

private long startTimestampMs;
private static final Logger LOG = LoggerFactory.getLogger(CreateFileMapper.class);
private static final byte[] FILE_CONTENT = { 0x0 };

private FileSystem fs;
private Configuration conf;
private int taskID;
private String fileParentPath;
private Path filePrefix;
private boolean shouldDelete;
private long endTimeStampMs;

@Override
public String getDescription() {
Expand All @@ -60,64 +57,41 @@ public String getDescription() {

@Override
public List<String> getConfigDescriptions() {
return Lists.newArrayList(
NUM_MAPPERS_KEY + " (required): Number of mappers to launch.",
DURATION_MIN_KEY + " (required): Number of minutes to induce workload for.",
SHOULD_DELETE_KEY + " (default: " + SHOULD_DELETE_DEFAULT + "): If true, delete the files after creating " +
"them. This can be useful for generating constant load without increasing the number of file objects.",
FILE_PARENT_PATH_KEY + " (default: " + FILE_PARENT_PATH_DEFAULT +
"): The root directory in which to create files."
);
List<String> baseList = TimedInputFormat.getConfigDescriptions();
baseList.add(SHOULD_DELETE_KEY + " (default: " + SHOULD_DELETE_DEFAULT +
"): If true, delete the files after creating them. This can be useful for generating constant load " +
"without increasing the number of file objects.");
baseList.add(FILE_PARENT_PATH_KEY + " (default: " + FILE_PARENT_PATH_DEFAULT +
"): The root directory in which to create files.");
return baseList;
}

@Override
public boolean verifyConfigurations(Configuration conf) {
return conf.get(NUM_MAPPERS_KEY) != null && conf.get(DURATION_MIN_KEY) != null;
return TimedInputFormat.verifyConfigurations(conf);
}

@Override
public void map(NullWritable key, NullWritable value, Mapper.Context mapperContext)
throws IOException, InterruptedException {
taskID = mapperContext.getTaskAttemptID().getTaskID().getId();
conf = mapperContext.getConfiguration();
public void setup(Mapper.Context context) throws IOException {
Configuration conf = context.getConfiguration();
String namenodeURI = conf.get(WorkloadDriver.NN_URI);
startTimestampMs = conf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1);
fileParentPath = conf.get(FILE_PARENT_PATH_KEY, FILE_PARENT_PATH_DEFAULT);
shouldDelete = conf.getBoolean(SHOULD_DELETE_KEY, SHOULD_DELETE_DEFAULT);
int durationMin = conf.getInt(DURATION_MIN_KEY, -1);
if (durationMin < 0) {
throw new IOException("Duration must be positive; got: " + durationMin);
}
endTimeStampMs = startTimestampMs + TimeUnit.MILLISECONDS.convert(durationMin, TimeUnit.MINUTES);
fs = FileSystem.get(URI.create(namenodeURI), conf);
System.out.println("Start timestamp: " + startTimestampMs);

long currentEpoch = System.currentTimeMillis();
long delay = startTimestampMs - currentEpoch;
if (delay > 0) {
System.out.println("Sleeping for " + delay + " ms");
Thread.sleep(delay);
}
shouldDelete = conf.getBoolean(SHOULD_DELETE_KEY, SHOULD_DELETE_DEFAULT);
String fileParentPath = conf.get(FILE_PARENT_PATH_KEY, FILE_PARENT_PATH_DEFAULT);
int taskID = context.getTaskAttemptID().getTaskID().getId();
filePrefix = new Path(String.format("%s/mapper%s/file", fileParentPath, taskID));
LOG.info("Mapper path prefix: " + filePrefix);
}

String mapperSpecifcPathPrefix = fileParentPath + "/mapper" + taskID;
System.out.println("Mapper path prefix: " + mapperSpecifcPathPrefix);
long numFilesCreated = 0;
Path path;
final byte[] content = {0x0};
while (System.currentTimeMillis() < endTimeStampMs ) {
path = new Path(mapperSpecifcPathPrefix + "/file" + numFilesCreated);
OutputStream out = fs.create(path);
out.write(content);
out.close();
numFilesCreated++;
mapperContext.getCounter(CREATEFILECOUNTERS.NUMFILESCREATED).increment(1L);
if (numFilesCreated % 1000 == 0) {
mapperContext.progress();
System.out.println("Number of files created: " + numFilesCreated);
}
if (shouldDelete) {
fs.delete(path);
}
@Override
public void map(LongWritable key, NullWritable value, Mapper.Context mapperContext) throws IOException {
Path path = filePrefix.suffix(key.get() + "");
OutputStream out = fs.create(path);
out.write(FILE_CONTENT);
out.close();
mapperContext.getCounter(CREATEFILECOUNTERS.NUMFILESCREATED).increment(1L);
if (shouldDelete) {
fs.delete(path, true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer.workloadgenerator;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;


/**
* An {@link InputFormat} which is time-based. Starts at some timestamp (specified by the
* {@value WorkloadDriver#START_TIMESTAMP_MS configuration) and runs for a time specified by the
* {@value DURATION_KEY} configuration. Spawns {@value NUM_MAPPERS_KEY} mappers. Both {@value DURATION_KEY}
* and {@value NUM_MAPPERS_KEY} are required.
*
* <p/>The values returned as the key by this InputFormat are just a sequential counter.
*/
public class TimedInputFormat extends InputFormat<LongWritable, NullWritable> {

public static final String DURATION_KEY = "timedinput.duration";
public static final String NUM_MAPPERS_KEY = "timedinput.num-mappers";

// Number of splits = Number of mappers. Creates fakeSplits to launch
// the required number of mappers
@Override
public List<InputSplit> getSplits(JobContext job) {
Configuration conf = job.getConfiguration();
int numMappers = conf.getInt(NUM_MAPPERS_KEY, -1);
List<InputSplit> splits = new ArrayList<>();
for (int i = 0; i < numMappers; i++) {
splits.add(new VirtualInputSplit());
}
return splits;
}

@Override
public RecordReader<LongWritable, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new TimedRecordReader();
}

public static List<String> getConfigDescriptions() {
return Lists.newArrayList(
NUM_MAPPERS_KEY + " (required): Number of mappers to launch.",
DURATION_KEY + " (required): Number of minutes to induce workload for."
);
}

public static boolean verifyConfigurations(Configuration conf) {
return conf.getInt(NUM_MAPPERS_KEY, -1) != -1
&& conf.getTimeDuration(DURATION_KEY, -1, TimeUnit.MILLISECONDS) != -1;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Copyright 2019 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer.workloadgenerator;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.dynamometer.workloadgenerator.TimedInputFormat.DURATION_KEY;


/**
* A {@link RecordReader} to support {@link TimedInputFormat}. Waits to emit the first
* value until the start time has been reached, then continually emits values
* (a sequential counter) until the end of the duration has been reached.
*/
public class TimedRecordReader extends RecordReader<LongWritable, NullWritable> {

private static final Logger LOG = LoggerFactory.getLogger(TimedRecordReader.class);

private LongWritable nextValue;
private long durationMs;
private long startTimestampMs;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
Configuration conf = context.getConfiguration();
durationMs = conf.getTimeDuration(DURATION_KEY, -1, TimeUnit.MILLISECONDS);
startTimestampMs = conf.getLong(WorkloadDriver.START_TIMESTAMP_MS, 0);
nextValue = new LongWritable(0);
}

@Override
public boolean nextKeyValue() throws InterruptedException {
long elapsed = System.currentTimeMillis() - startTimestampMs;
LOG.info("Starting at " + startTimestampMs + " ms");
if (elapsed < 0) {
LOG.info("Sleeping for " + (-1 * elapsed) + " ms");
Thread.sleep(-1 * elapsed);
} else if (elapsed > durationMs) {
return false;
}
nextValue.set(nextValue.get() + 1L);
return true;
}

@Override
public LongWritable getCurrentKey() {
return nextValue;
}

@Override
public NullWritable getCurrentValue() {
return NullWritable.get();
}

@Override
public float getProgress() {
return (System.currentTimeMillis() - startTimestampMs) / ((float) durationMs);
}

@Override
public void close() {
// do Nothing
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.hadoop.mapreduce.InputSplit;


public class VirtualInputSplit<K, V> extends InputSplit implements Writable {
public class VirtualInputSplit extends InputSplit implements Writable {

@Override
public void write(DataOutput out) throws IOException {
Expand Down
Loading