Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The other supporter format is `com.linkedin.dynamometer.workloadgenerator.audit.
files in the format produced by a Hive query with output fields, in order:

* `relativeTimestamp`: event time offset, in milliseconds, from the start of the trace
* `ugi`: user information of the submitting user
* `command`: name of the command, e.g. 'open'
* `source`: source path
* `dest`: destination path
Expand All @@ -134,7 +135,7 @@ files in the format produced by a Hive query with output fields, in order:
Assuming your audit logs are available in Hive, this can be produced via a Hive query looking like:
```sql
INSERT OVERWRITE DIRECTORY '${outputPath}'
SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, command, source, dest, sourceIP
SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, ugi, command, source, dest, sourceIP
FROM '${auditLogTableLocation}'
WHERE timestamp >= ${startTimestamp} AND timestamp < ${endTimestamp}
DISTRIBUTE BY src
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ImpersonationProvider;


/**
* An {@link ImpersonationProvider} that indiscriminately allows all users
* to proxy as any other user.
*/
public class AllowAllImpersonationProvider extends Configured implements ImpersonationProvider {

public void init(String configurationPrefix) {
// Do nothing
}
public void authorize(UserGroupInformation user, String remoteAddress) {
// Do nothing
}

}
5 changes: 3 additions & 2 deletions dynamometer-infra/src/main/resources/start-component.sh
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ EOF
ln -snf "`pwd`/VERSION" "$nameDir/current/VERSION"
chmod 700 "$nameDir/current/"*

# To be able to use the custom block placement policy
# To be able to use the custom block placement policy and the AllowAllImpersonationProvider
export HADOOP_CLASSPATH="`pwd`/dynamometer.jar:$HADOOP_CLASSPATH"

read -r -d '' namenodeConfigs <<EOF
Expand All @@ -266,9 +266,10 @@ EOF
-D dfs.namenode.kerberos.principal=
-D dfs.namenode.keytab.file=
-D dfs.namenode.safemode.threshold-pct=0.0f
-D dfs.permissions.enabled=false
-D dfs.permissions.enabled=true
-D dfs.cluster.administrators="*"
-D dfs.block.replicator.classname=com.linkedin.dynamometer.BlockPlacementPolicyAlwaysSatisfied
-D hadoop.security.impersonation.provider.class=com.linkedin.dynamometer.AllowAllImpersonationProvider
${configOverrides}
EOF

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ public Boolean get() {
fail("Workload job failed");
}
Counters counters = client.getWorkloadJob().getCounters();
assertEquals(3, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS).getValue());
assertEquals(0, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue());
assertEquals(6, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS).getValue());
assertEquals(1, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue());

LOG.info("Waiting for infra application to exit");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
Expand Down
13 changes: 13 additions & 0 deletions dynamometer-infra/src/test/resources/conf/etc/hadoop/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--

Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
See LICENSE in the project root for license information.

-->
<configuration>
<property>
<name>hadoop.security.impersonation.provider.class</name>
<value>com.linkedin.dynamometer.AllowAllImpersonationProvider</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class AuditLogDirectParser implements AuditCommandParser {
private static final Pattern MESSAGE_ONLY_PATTERN = Pattern.compile("^([0-9-]+ [0-9:,]+) [^:]+: (.+)$");
private static final Splitter.MapSplitter AUDIT_SPLITTER =
Splitter.on("\t").trimResults().omitEmptyStrings().withKeyValueSeparator("=");
private static final Splitter SPACE_SPLITTER = Splitter.on(" ").trimResults().omitEmptyStrings();
private static final DateFormat AUDIT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
static {
AUDIT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
Expand Down Expand Up @@ -63,6 +64,8 @@ public AuditReplayCommand parse(Text inputLine, Function<Long, Long> relativeToA
String auditMessageSanitized = m.group(2).replace("(options=", "(options:");
Map<String, String> parameterMap = AUDIT_SPLITTER.split(auditMessageSanitized);
return new AuditReplayCommand(relativeToAbsolute.apply(relativeTimestamp),
// Split the UGI on space to remove the auth and proxy portions of it
SPACE_SPLITTER.split(parameterMap.get("ugi")).iterator().next(),
parameterMap.get("cmd").replace("(options:", "(options="),
parameterMap.get("src"), parameterMap.get("dst"), parameterMap.get("ip"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
* which fields should be separated by the start-of-heading (U+0001) character.
* The fields available should be, in order:
* <pre>
* relativeTimestampMs,command,src,dest,sourceIP
* relativeTimestampMs,ugi,command,src,dest,sourceIP
* </pre>
* Where relativeTimestampMs represents the time elapsed between the start of
* the audit log and the occurrence of the audit event. Assuming your audit
* logs are available in Hive, this can be generated with a query looking like:
* <pre>
* INSERT OVERWRITE DIRECTORY '${outputPath}'
* SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, cmd, src, dst, ip
* SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, ugi, cmd, src, dst, ip
* FROM '${auditLogTableLocation}'
* WHERE timestamp >= ${startTimestamp} AND timestamp < ${endTimestamp}
* DISTRIBUTE BY src
Expand All @@ -45,7 +45,7 @@ public void initialize(Configuration conf) throws IOException {
public AuditReplayCommand parse(Text inputLine, Function<Long, Long> relativeToAbsolute) throws IOException {
String[] fields = inputLine.toString().split(FIELD_SEPARATOR);
long absoluteTimestamp = relativeToAbsolute.apply(Long.parseLong(fields[0]));
return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2], fields[3], fields[4]);
return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2], fields[3], fields[4], fields[5]);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
*/
package com.linkedin.dynamometer.workloadgenerator.audit;

import java.io.IOException;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -19,14 +25,19 @@
*/
class AuditReplayCommand implements Delayed {

private static final Logger LOG = LoggerFactory.getLogger(AuditReplayCommand.class);
private static final Pattern SIMPLE_UGI_PATTERN = Pattern.compile("([^/@ ]*).*?");

private long absoluteTimestamp;
private String ugi;
private String command;
private String src;
private String dest;
private String sourceIP;

AuditReplayCommand(long absoluteTimestamp, String command, String src, String dest, String sourceIP) {
AuditReplayCommand(long absoluteTimestamp, String ugi, String command, String src, String dest, String sourceIP) {
this.absoluteTimestamp = absoluteTimestamp;
this.ugi = ugi;
this.command = command;
this.src = src;
this.dest = dest;
Expand All @@ -37,6 +48,20 @@ long getAbsoluteTimestamp() {
return absoluteTimestamp;
}

String getSimpleUgi() {
Matcher m = SIMPLE_UGI_PATTERN.matcher(ugi);
if (m.matches()) {
return m.group(1);
} else {
LOG.error("Error parsing simple UGI <{}>; falling back to current user", ugi);
try {
return UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ioe) {
return "";
}
}
}

String getCommand() {
return command;
}
Expand Down Expand Up @@ -79,7 +104,7 @@ boolean isPoison() {
private static class PoisonPillCommand extends AuditReplayCommand {

private PoisonPillCommand(long absoluteTimestamp) {
super(absoluteTimestamp, null, null, null, null);
super(absoluteTimestamp, null, null, null, null, null);
}

@Override
Expand All @@ -99,13 +124,13 @@ public boolean equals(Object other) {
return false;
}
AuditReplayCommand o = (AuditReplayCommand) other;
return absoluteTimestamp == o.absoluteTimestamp && command.equals(o.command) && src.equals(o.src) &&
dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
return absoluteTimestamp == o.absoluteTimestamp && ugi.equals(o.ugi) && command.equals(o.command) &&
src.equals(o.src) && dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
}

@Override
public String toString() {
return String.format("AuditReplayCommand(absoluteTimestamp=%d, command=%s, src=%s, dest=%s, sourceIP=%s",
absoluteTimestamp, command, src, dest, sourceIP);
return String.format("AuditReplayCommand(absoluteTimestamp=%d, ugi=%s, command=%s, src=%s, dest=%s, sourceIP=%s",
absoluteTimestamp, ugi, command, src, dest, sourceIP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
Expand Down Expand Up @@ -187,9 +190,10 @@ public Long apply(Long input) {
LOG.info("Starting " + numThreads + " threads");

threads = new ArrayList<>();
ConcurrentMap<String, FileSystem> fsCache = new ConcurrentHashMap<>();
commandQueue = new DelayQueue<>();
for (int i = 0; i < numThreads; i++) {
AuditReplayThread thread = new AuditReplayThread(context, commandQueue);
AuditReplayThread thread = new AuditReplayThread(context, commandQueue, fsCache);
threads.add(thread);
thread.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.Map;
Expand All @@ -21,7 +23,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.mapreduce.Counter;
Expand Down Expand Up @@ -49,28 +50,30 @@ public class AuditReplayThread extends Thread {
private static final Log LOG = LogFactory.getLog(AuditReplayThread.class);

private DelayQueue<AuditReplayCommand> commandQueue;
private ConcurrentMap<String, FileSystem> fsCache;
private URI namenodeUri;
private UserGroupInformation loginUser;
private Configuration mapperConf;
// If any exception is encountered it will be stored here
private Exception exception;
private long startTimestampMs;
private FileSystem fs;
private DFSClient dfsClient;
private boolean createBlocks;

// Counters are not thread-safe so we store a local mapping in our thread
// and merge them all together at the end.
private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
private Map<String, Counter> individualCommandsMap = new HashMap<>();

AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue)
throws IOException {
AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue,
ConcurrentMap<String, FileSystem> fsCache) throws IOException {
commandQueue = queue;
Configuration mapperConf = mapperContext.getConfiguration();
String namenodeURI = mapperConf.get(WorkloadDriver.NN_URI);
this.fsCache = fsCache;
loginUser = UserGroupInformation.getLoginUser();
mapperConf = mapperContext.getConfiguration();
namenodeUri = URI.create(mapperConf.get(WorkloadDriver.NN_URI));
startTimestampMs = mapperConf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1);
createBlocks = mapperConf.getBoolean(AuditReplayMapper.CREATE_BLOCKS_KEY,
AuditReplayMapper.CREATE_BLOCKS_DEFAULT);
fs = FileSystem.get(URI.create(namenodeURI), mapperConf);
dfsClient = ((DistributedFileSystem) fs).getClient();
LOG.info("Start timestamp: " + startTimestampMs);
for (REPLAYCOUNTERS rc : REPLAYCOUNTERS.values()) {
replayCountersMap.put(rc, new GenericCounter());
Expand Down Expand Up @@ -134,7 +137,7 @@ public void run() {
replayCountersMap.get(REPLAYCOUNTERS.LATECOMMANDS).increment(1);
replayCountersMap.get(REPLAYCOUNTERS.LATECOMMANDSTOTALTIME).increment(-1 * delay);
}
if (!replayLog(cmd.getCommand(), cmd.getSrc(), cmd.getDest())) {
if (!replayLog(cmd)) {
replayCountersMap.get(REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).increment(1);
}
cmd = commandQueue.take();
Expand All @@ -149,15 +152,33 @@ public void run() {

/**
* Attempt to replay the provided command. Updates counters accordingly.
* @param command The name of the command to replay.
* @param src The source path of the command.
* @param dst The destination path of the command (null except for rename and concat).
* @param command The command to replay
* @return True iff the command was successfully replayed (i.e., no exceptions were thrown).
*/
private boolean replayLog(String command, String src, String dst) {
private boolean replayLog(final AuditReplayCommand command) {
final String src = command.getSrc();
final String dst = command.getDest();
FileSystem proxyFs = fsCache.get(command.getSimpleUgi());
if (proxyFs == null) {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(command.getSimpleUgi(), loginUser);
proxyFs = ugi.doAs(new PrivilegedAction<FileSystem>() {
@Override
public FileSystem run() {
try {
FileSystem fs = new DistributedFileSystem();
fs.initialize(namenodeUri, mapperConf);
return fs;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
});
fsCache.put(command.getSimpleUgi(), proxyFs);
}
final FileSystem fs = proxyFs;
ReplayCommand replayCommand;
try {
replayCommand = ReplayCommand.valueOf(command.split(" ")[0].toUpperCase());
replayCommand = ReplayCommand.valueOf(command.getCommand().split(" ")[0].toUpperCase());
} catch (IllegalArgumentException iae) {
LOG.warn("Unsupported/invalid command: " + command);
replayCountersMap.get(REPLAYCOUNTERS.TOTALUNSUPPORTEDCOMMANDS).increment(1);
Expand Down Expand Up @@ -191,7 +212,7 @@ private boolean replayLog(String command, String src, String dst) {
break;

case LISTSTATUS:
dfsClient.listPaths(src, HdfsFileStatus.EMPTY_NAME);
((DistributedFileSystem) fs).getClient().listPaths(src, HdfsFileStatus.EMPTY_NAME);
break;

case APPEND:
Expand Down
Loading