diff --git a/README.md b/README.md index fc43db67f9..bc01585328 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ The other supporter format is `com.linkedin.dynamometer.workloadgenerator.audit. files in the format produced by a Hive query with output fields, in order: * `relativeTimestamp`: event time offset, in milliseconds, from the start of the trace +* `ugi`: user information of the submitting user * `command`: name of the command, e.g. 'open' * `source`: source path * `dest`: destination path @@ -134,7 +135,7 @@ files in the format produced by a Hive query with output fields, in order: Assuming your audit logs are available in Hive, this can be produced via a Hive query looking like: ```sql INSERT OVERWRITE DIRECTORY '${outputPath}' -SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, command, source, dest, sourceIP +SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, ugi, command, source, dest, sourceIP FROM '${auditLogTableLocation}' WHERE timestamp >= ${startTimestamp} AND timestamp < ${endTimestamp} DISTRIBUTE BY src diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AllowAllImpersonationProvider.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AllowAllImpersonationProvider.java new file mode 100644 index 0000000000..b45a72d9ed --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AllowAllImpersonationProvider.java @@ -0,0 +1,25 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.ImpersonationProvider; + + +/** + * An {@link ImpersonationProvider} that indiscriminately allows all users + * to proxy as any other user. + */ +public class AllowAllImpersonationProvider extends Configured implements ImpersonationProvider { + + public void init(String configurationPrefix) { + // Do nothing + } + public void authorize(UserGroupInformation user, String remoteAddress) { + // Do nothing + } + +} diff --git a/dynamometer-infra/src/main/resources/start-component.sh b/dynamometer-infra/src/main/resources/start-component.sh index ebdb0aece5..41ec8b68e4 100755 --- a/dynamometer-infra/src/main/resources/start-component.sh +++ b/dynamometer-infra/src/main/resources/start-component.sh @@ -247,7 +247,7 @@ EOF ln -snf "`pwd`/VERSION" "$nameDir/current/VERSION" chmod 700 "$nameDir/current/"* - # To be able to use the custom block placement policy + # To be able to use the custom block placement policy and the AllowAllImpersonationProvider export HADOOP_CLASSPATH="`pwd`/dynamometer.jar:$HADOOP_CLASSPATH" read -r -d '' namenodeConfigs <() { diff --git a/dynamometer-infra/src/test/resources/conf/etc/hadoop/core-site.xml b/dynamometer-infra/src/test/resources/conf/etc/hadoop/core-site.xml new file mode 100644 index 0000000000..2565abc9cb --- /dev/null +++ b/dynamometer-infra/src/test/resources/conf/etc/hadoop/core-site.xml @@ -0,0 +1,13 @@ + + + + + hadoop.security.impersonation.provider.class + com.linkedin.dynamometer.AllowAllImpersonationProvider + + diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java index 5652a5f44a..72b575192d 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java @@ -32,6 +32,7 @@ public class AuditLogDirectParser implements AuditCommandParser { private static final Pattern MESSAGE_ONLY_PATTERN = Pattern.compile("^([0-9-]+ [0-9:,]+) [^:]+: (.+)$"); private static final Splitter.MapSplitter AUDIT_SPLITTER = Splitter.on("\t").trimResults().omitEmptyStrings().withKeyValueSeparator("="); + private static final Splitter SPACE_SPLITTER = Splitter.on(" ").trimResults().omitEmptyStrings(); private static final DateFormat AUDIT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS"); static { AUDIT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -63,6 +64,8 @@ public AuditReplayCommand parse(Text inputLine, Function relativeToA String auditMessageSanitized = m.group(2).replace("(options=", "(options:"); Map parameterMap = AUDIT_SPLITTER.split(auditMessageSanitized); return new AuditReplayCommand(relativeToAbsolute.apply(relativeTimestamp), + // Split the UGI on space to remove the auth and proxy portions of it + SPACE_SPLITTER.split(parameterMap.get("ugi")).iterator().next(), parameterMap.get("cmd").replace("(options:", "(options="), parameterMap.get("src"), parameterMap.get("dst"), parameterMap.get("ip")); } diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java index 67319757f9..7651fd71a5 100644 --- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java +++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java @@ -16,14 +16,14 @@ * which fields should be separated by the start-of-heading (U+0001) character. * The fields available should be, in order: *
- *   relativeTimestampMs,command,src,dest,sourceIP
+ *   relativeTimestampMs,ugi,command,src,dest,sourceIP
  * 
* Where relativeTimestampMs represents the time elapsed between the start of * the audit log and the occurrence of the audit event. Assuming your audit * logs are available in Hive, this can be generated with a query looking like: *
  *   INSERT OVERWRITE DIRECTORY '${outputPath}'
- *   SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, cmd, src, dst, ip
+ *   SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, ugi, cmd, src, dst, ip
  *   FROM '${auditLogTableLocation}'
  *   WHERE timestamp >= ${startTimestamp} AND timestamp < ${endTimestamp}
  *   DISTRIBUTE BY src
@@ -45,7 +45,7 @@ public void initialize(Configuration conf) throws IOException {
   public AuditReplayCommand parse(Text inputLine, Function relativeToAbsolute) throws IOException {
     String[] fields = inputLine.toString().split(FIELD_SEPARATOR);
     long absoluteTimestamp = relativeToAbsolute.apply(Long.parseLong(fields[0]));
-    return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2], fields[3], fields[4]);
+    return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2], fields[3], fields[4], fields[5]);
   }
 
 }
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayCommand.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayCommand.java
index 87c34119dd..ecec939561 100644
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayCommand.java
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayCommand.java
@@ -4,8 +4,14 @@
  */
 package com.linkedin.dynamometer.workloadgenerator.audit;
 
+import java.io.IOException;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -19,14 +25,19 @@
  */
 class AuditReplayCommand implements Delayed {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AuditReplayCommand.class);
+  private static final Pattern SIMPLE_UGI_PATTERN = Pattern.compile("([^/@ ]*).*?");
+
   private long absoluteTimestamp;
+  private String ugi;
   private String command;
   private String src;
   private String dest;
   private String sourceIP;
 
-  AuditReplayCommand(long absoluteTimestamp, String command, String src, String dest, String sourceIP) {
+  AuditReplayCommand(long absoluteTimestamp, String ugi, String command, String src, String dest, String sourceIP) {
     this.absoluteTimestamp = absoluteTimestamp;
+    this.ugi = ugi;
     this.command = command;
     this.src = src;
     this.dest = dest;
@@ -37,6 +48,20 @@ long getAbsoluteTimestamp() {
     return absoluteTimestamp;
   }
 
+  String getSimpleUgi() {
+    Matcher m = SIMPLE_UGI_PATTERN.matcher(ugi);
+    if (m.matches()) {
+      return m.group(1);
+    } else {
+      LOG.error("Error parsing simple UGI <{}>; falling back to current user", ugi);
+      try {
+        return UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException ioe) {
+        return "";
+      }
+    }
+  }
+
   String getCommand() {
     return command;
   }
@@ -79,7 +104,7 @@ boolean isPoison() {
   private static class PoisonPillCommand extends AuditReplayCommand {
 
     private PoisonPillCommand(long absoluteTimestamp) {
-      super(absoluteTimestamp, null, null, null, null);
+      super(absoluteTimestamp, null, null, null, null, null);
     }
 
     @Override
@@ -99,13 +124,13 @@ public boolean equals(Object other) {
       return false;
     }
     AuditReplayCommand o = (AuditReplayCommand) other;
-    return absoluteTimestamp == o.absoluteTimestamp && command.equals(o.command) && src.equals(o.src) &&
-        dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
+    return absoluteTimestamp == o.absoluteTimestamp && ugi.equals(o.ugi) && command.equals(o.command) &&
+        src.equals(o.src) && dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
   }
 
   @Override
   public String toString() {
-    return String.format("AuditReplayCommand(absoluteTimestamp=%d, command=%s, src=%s, dest=%s, sourceIP=%s",
-        absoluteTimestamp, command, src, dest, sourceIP);
+    return String.format("AuditReplayCommand(absoluteTimestamp=%d, ugi=%s, command=%s, src=%s, dest=%s, sourceIP=%s",
+        absoluteTimestamp, ugi, command, src, dest, sourceIP);
   }
 }
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
index 987764b272..76d1fcf887 100644
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
@@ -13,11 +13,14 @@
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -187,9 +190,10 @@ public Long apply(Long input) {
     LOG.info("Starting " + numThreads + " threads");
 
     threads = new ArrayList<>();
+    ConcurrentMap fsCache = new ConcurrentHashMap<>();
     commandQueue = new DelayQueue<>();
     for (int i = 0; i < numThreads; i++) {
-      AuditReplayThread thread = new AuditReplayThread(context, commandQueue);
+      AuditReplayThread thread = new AuditReplayThread(context, commandQueue, fsCache);
       threads.add(thread);
       thread.start();
     }
diff --git a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java
index 59be1bac53..db480b9e7b 100644
--- a/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java
+++ b/dynamometer-workload/src/main/java/com/linkedin/dynamometer/workloadgenerator/audit/AuditReplayThread.java
@@ -8,9 +8,11 @@
 import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver;
 import java.io.IOException;
 import java.net.URI;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.Map;
@@ -21,7 +23,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.mapreduce.Counter;
@@ -49,11 +50,13 @@ public class AuditReplayThread extends Thread {
   private static final Log LOG = LogFactory.getLog(AuditReplayThread.class);
 
   private DelayQueue commandQueue;
+  private ConcurrentMap fsCache;
+  private URI namenodeUri;
+  private UserGroupInformation loginUser;
+  private Configuration mapperConf;
   // If any exception is encountered it will be stored here
   private Exception exception;
   private long startTimestampMs;
-  private FileSystem fs;
-  private DFSClient dfsClient;
   private boolean createBlocks;
 
   // Counters are not thread-safe so we store a local mapping in our thread
@@ -61,16 +64,16 @@ public class AuditReplayThread extends Thread {
   private Map replayCountersMap = new HashMap<>();
   private Map individualCommandsMap = new HashMap<>();
 
-  AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue)
-      throws IOException {
+  AuditReplayThread(Mapper.Context mapperContext, DelayQueue queue,
+      ConcurrentMap fsCache) throws IOException {
     commandQueue = queue;
-    Configuration mapperConf = mapperContext.getConfiguration();
-    String namenodeURI = mapperConf.get(WorkloadDriver.NN_URI);
+    this.fsCache = fsCache;
+    loginUser = UserGroupInformation.getLoginUser();
+    mapperConf = mapperContext.getConfiguration();
+    namenodeUri = URI.create(mapperConf.get(WorkloadDriver.NN_URI));
     startTimestampMs = mapperConf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1);
     createBlocks = mapperConf.getBoolean(AuditReplayMapper.CREATE_BLOCKS_KEY,
         AuditReplayMapper.CREATE_BLOCKS_DEFAULT);
-    fs = FileSystem.get(URI.create(namenodeURI), mapperConf);
-    dfsClient = ((DistributedFileSystem) fs).getClient();
     LOG.info("Start timestamp: " + startTimestampMs);
     for (REPLAYCOUNTERS rc : REPLAYCOUNTERS.values()) {
       replayCountersMap.put(rc, new GenericCounter());
@@ -134,7 +137,7 @@ public void run() {
           replayCountersMap.get(REPLAYCOUNTERS.LATECOMMANDS).increment(1);
           replayCountersMap.get(REPLAYCOUNTERS.LATECOMMANDSTOTALTIME).increment(-1 * delay);
         }
-        if (!replayLog(cmd.getCommand(), cmd.getSrc(), cmd.getDest())) {
+        if (!replayLog(cmd)) {
           replayCountersMap.get(REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).increment(1);
         }
         cmd = commandQueue.take();
@@ -149,15 +152,33 @@ public void run() {
 
   /**
    * Attempt to replay the provided command. Updates counters accordingly.
-   * @param command The name of the command to replay.
-   * @param src The source path of the command.
-   * @param dst The destination path of the command (null except for rename and concat).
+   * @param command The command to replay
    * @return True iff the command was successfully replayed (i.e., no exceptions were thrown).
    */
-  private boolean replayLog(String command, String src, String dst) {
+  private boolean replayLog(final AuditReplayCommand command) {
+    final String src = command.getSrc();
+    final String dst = command.getDest();
+    FileSystem proxyFs = fsCache.get(command.getSimpleUgi());
+    if (proxyFs == null) {
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(command.getSimpleUgi(), loginUser);
+      proxyFs = ugi.doAs(new PrivilegedAction() {
+        @Override
+        public FileSystem run() {
+          try {
+            FileSystem fs = new DistributedFileSystem();
+            fs.initialize(namenodeUri, mapperConf);
+            return fs;
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        }
+      });
+      fsCache.put(command.getSimpleUgi(), proxyFs);
+    }
+    final FileSystem fs = proxyFs;
     ReplayCommand replayCommand;
     try {
-      replayCommand = ReplayCommand.valueOf(command.split(" ")[0].toUpperCase());
+      replayCommand = ReplayCommand.valueOf(command.getCommand().split(" ")[0].toUpperCase());
     } catch (IllegalArgumentException iae) {
       LOG.warn("Unsupported/invalid command: " + command);
       replayCountersMap.get(REPLAYCOUNTERS.TOTALUNSUPPORTEDCOMMANDS).increment(1);
@@ -191,7 +212,7 @@ private boolean replayLog(String command, String src, String dst) {
           break;
 
         case LISTSTATUS:
-          dfsClient.listPaths(src, HdfsFileStatus.EMPTY_NAME);
+          ((DistributedFileSystem) fs).getClient().listPaths(src, HdfsFileStatus.EMPTY_NAME);
           break;
 
         case APPEND:
diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java
index 8a2eb03b5e..ebee438949 100644
--- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java
+++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/TestWorkloadGenerator.java
@@ -8,7 +8,10 @@
 import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
 import com.linkedin.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser;
 import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper;
+import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -16,11 +19,15 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ImpersonationProvider;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 
@@ -33,6 +40,8 @@ public class TestWorkloadGenerator {
   @Before
   public void setup() throws Exception {
     conf = new Configuration();
+    conf.setClass(CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS,
+        AllowUserImpersonationProvider.class, ImpersonationProvider.class);
     miniCluster = new MiniDFSCluster.Builder(conf).build();
     miniCluster.waitClusterUp();
     dfs = miniCluster.getFileSystem();
@@ -65,6 +74,25 @@ public void testAuditWorkloadHiveParser() throws Exception {
     testAuditWorkload();
   }
 
+  /**
+   * {@link ImpersonationProvider} that confirms the user doing the impersonating is the same as the user
+   * running the MiniCluster.
+   */
+  private static class AllowUserImpersonationProvider extends Configured implements ImpersonationProvider {
+    public void init(String configurationPrefix) {
+      // Do nothing
+    }
+    public void authorize(UserGroupInformation user, String remoteAddress) throws AuthorizationException {
+      try {
+        if (!user.getRealUser().getShortUserName().equals(UserGroupInformation.getCurrentUser().getShortUserName())) {
+          throw new AuthorizationException();
+        }
+      } catch (IOException ioe) {
+        throw new AuthorizationException(ioe);
+      }
+    }
+  }
+
   private void testAuditWorkload() throws Exception {
     long workloadStartTime = System.currentTimeMillis() + 10000;
     Job workloadJob = WorkloadDriver.getJobForSubmission(conf, dfs.getUri().toString(),
@@ -72,9 +100,10 @@ private void testAuditWorkload() throws Exception {
     boolean success = workloadJob.waitForCompletion(true);
     assertTrue("workload job should succeed", success);
     Counters counters = workloadJob.getCounters();
-    assertEquals(3, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS).getValue());
-    assertEquals(0, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue());
+    assertEquals(4, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS).getValue());
+    assertEquals(1, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue());
     assertTrue(dfs.getFileStatus(new Path("/tmp/test1")).isFile());
     assertTrue(dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory());
+    assertFalse(dfs.exists(new Path("/denied")));
   }
 }
diff --git a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java
index 3932ccb23b..f79c05a2c3 100644
--- a/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java
+++ b/dynamometer-workload/src/test/java/com/linkedin/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java
@@ -41,7 +41,7 @@ private Text getAuditString(String timestamp, String ugi, String cmd, String src
   public void testSimpleInput() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser", "listStatus", "sourcePath", "null");
     AuditReplayCommand expected =
-        new AuditReplayCommand(1000, "listStatus", "sourcePath", "null", "0.0.0.0");
+        new AuditReplayCommand(1000, "fakeUser", "listStatus", "sourcePath", "null", "0.0.0.0");
     assertEquals(expected, parser.parse(in, IDENTITY_FN));
   }
 
@@ -50,7 +50,7 @@ public void testInputWithRenameOptions() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser", "rename (options=[TO_TRASH])",
         "sourcePath", "destPath");
     AuditReplayCommand expected =
-        new AuditReplayCommand(1000, "rename (options=[TO_TRASH])", "sourcePath", "destPath", "0.0.0.0");
+        new AuditReplayCommand(1000, "fakeUser", "rename (options=[TO_TRASH])", "sourcePath", "destPath", "0.0.0.0");
     assertEquals(expected, parser.parse(in, IDENTITY_FN));
   }
 
@@ -59,7 +59,7 @@ public void testInputWithTokenAuth() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser (auth:TOKEN)", "create",
         "sourcePath", "null");
     AuditReplayCommand expected =
-        new AuditReplayCommand(1000, "create", "sourcePath", "null", "0.0.0.0");
+        new AuditReplayCommand(1000, "fakeUser", "create", "sourcePath", "null", "0.0.0.0");
     assertEquals(expected, parser.parse(in, IDENTITY_FN));
   }
 
@@ -68,7 +68,7 @@ public void testInputWithProxyUser() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "proxyUser (auth:TOKEN) via fakeUser", "create",
         "sourcePath", "null");
     AuditReplayCommand expected =
-        new AuditReplayCommand(1000, "create", "sourcePath", "null", "0.0.0.0");
+        new AuditReplayCommand(1000, "proxyUser", "create", "sourcePath", "null", "0.0.0.0");
     assertEquals(expected, parser.parse(in, IDENTITY_FN));
   }
 
diff --git a/dynamometer-workload/src/test/resources/audit_trace_direct/audit0 b/dynamometer-workload/src/test/resources/audit_trace_direct/audit0
index 11766b9594..9e7bc2ca34 100644
--- a/dynamometer-workload/src/test/resources/audit_trace_direct/audit0
+++ b/dynamometer-workload/src/test/resources/audit_trace_direct/audit0
@@ -1,3 +1,6 @@
 1970-01-01 00:00:01,010 INFO FSNamesystem.audit: allowed=true	ugi=hdfs	ip=/0.0.0.0	cmd=create	src=/tmp/test1	dst=null	perm=null	proto=rpc
 1970-01-01 00:00:01,020 INFO FSNamesystem.audit: allowed=true	ugi=hdfs	ip=/0.0.0.0	cmd=mkdirs	src=/tmp/testDir	dst=null	perm=null	proto=rpc
 1970-01-01 00:00:01,030 INFO FSNamesystem.audit: allowed=true	ugi=hdfs	ip=/0.0.0.0	cmd=rename	src=/tmp/testDir	dst=/tmp/testDirRenamed	perm=null	proto=rpc
+1970-01-01 00:00:01,040 INFO FSNamesystem.audit: allowed=true	ugi=hdfs@REALM.COM	ip=/0.0.0.0	cmd=mkdirs	src=/tmp/testDir2	dst=null	perm=null	proto=rpc
+1970-01-01 00:00:01,050 INFO FSNamesystem.audit: allowed=true	ugi=hdfs/127.0.0.1@REALM.COM	ip=/0.0.0.0	cmd=mkdirs	src=/tmp/testDir3	dst=null	perm=null	proto=rpc
+1970-01-01 00:00:01,060 INFO FSNamesystem.audit: allowed=true	ugi=otherUser	ip=/0.0.0.0	cmd=mkdirs	src=/denied	dst=null	perm=null	proto=rpc
diff --git a/dynamometer-workload/src/test/resources/audit_trace_hive/audit0 b/dynamometer-workload/src/test/resources/audit_trace_hive/audit0
index fe79dc5a6c..3f1234a2a9 100644
--- a/dynamometer-workload/src/test/resources/audit_trace_hive/audit0
+++ b/dynamometer-workload/src/test/resources/audit_trace_hive/audit0
@@ -1,3 +1,6 @@
-10create/tmp/test1 0.0.0.0
-20mkdirs/tmp/testDir 0.0.0.0
-30rename/tmp/testDir/tmp/testDirRenamed0.0.0.0
+10hdfscreate/tmp/test1 0.0.0.0
+20hdfsmkdirs/tmp/testDir 0.0.0.0
+30hdfsrename/tmp/testDir/tmp/testDirRenamed0.0.0.0
+40hdfs@REALM.COMmkdirs/tmp/testDir2 0.0.0.0
+50hdfs/127.0.0.1@REALM.COMmkdirs/tmp/testDir3 0.0.0.0
+60otherUsermkdirs/denied 0.0.0.0