Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public class DAGAppMaster extends AbstractService {
private TaskHeartbeatHandler taskHeartbeatHandler;
private TaskCommunicatorManagerInterface taskCommunicatorManager;
private JobTokenSecretManager jobTokenSecretManager;
private JobTokenSecretManager shuffleJobTokenSecretManager;
private Token<JobTokenIdentifier> sessionToken;
private DagEventDispatcher dagEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
Expand Down Expand Up @@ -528,6 +529,12 @@ protected void serviceInit(final Configuration conf) throws Exception {
jobTokenSecretManager.addTokenForJob(
appAttemptID.getApplicationId().toString(), sessionToken);

// Create a JobTokenSecretManager initialized with the session token's password
// for shuffle delete requests. The shuffle handler verifies requests using the
// token password, so the MAC must be initialized with it (as tasks do).
shuffleJobTokenSecretManager = new JobTokenSecretManager(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we need a separate secret manager, as the sessionToken is provided for jobTokenSecretManager already:

jobTokenSecretManager.addTokenForJob(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobTokenSecretManager has two different key paths:

  1. computeHash(byte[] msg) (instance method) — uses mac.doFinal(msg) with the MAC master key (random at construction)
  2. retrieveTokenSecret(String jobId) — returns the per-job key from the currentJobTokens map (populated by addTokenForJob)

The client-side AsyncHttpConnection.computeEncHash() calls:
encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
// → mgr.computeHash(msg) ← uses MAC master key, NOT the per-job token from the map

The server-side verifyRequest calls:
SecretKey tokenSecret = getSecretManager().retrieveTokenSecret(appid); // ← uses the map
SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);

So addTokenForJob populates the map (server path), but the client hash computation uses the MAC field (instance method). They're completely separate code paths. The MAC must be initialized
with the token password for the client-side hash to match what the server verifies.

It was leading to test failure here: https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-512/3/testReport/org.apache.tez.auxservices/TestShuffleHandlerJobs/testOrderedWordCount/

Did I decode it wrong?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right, thanks for clarifying, I tend to forget SecretManager internals
so the code comment indeed explains the situation well: client side uses the instance method, so we must initialize the instance MAC with the sessionToken, which is used from the map on the server/shufflehandler side

JobTokenSecretManager.createSecretKey(sessionToken.getPassword()), amConf);

//service to handle requests to TaskUmbilicalProtocol
taskCommunicatorManager = createTaskCommunicatorManager(context,
taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors);
Expand Down Expand Up @@ -908,7 +915,7 @@ protected synchronized void handle(DAGAppMasterEvent event) {
DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
cleanupEvent.getDag().getID());
containerLauncherManager.dagComplete(cleanupEvent.getDag().getID(), jobTokenSecretManager);
containerLauncherManager.dagComplete(cleanupEvent.getDag().getID(), shuffleJobTokenSecretManager);
taskCommunicatorManager.dagComplete(cleanupEvent.getDag());
nodes.dagComplete(cleanupEvent.getDag());
containers.dagComplete(cleanupEvent.getDag());
Expand Down Expand Up @@ -2744,10 +2751,10 @@ public String getWebUIAddress() {
}

public void vertexComplete(TezVertexID completedVertexID, Set<NodeId> nodesList) {
getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList);
getContainerLauncherManager().vertexComplete(completedVertexID, shuffleJobTokenSecretManager, nodesList);
}

public void taskAttemptFailed(TezTaskAttemptID attemptID, NodeId nodeId) {
getContainerLauncherManager().taskAttemptFailed(attemptID, jobTokenSecretManager, nodeId);
getContainerLauncherManager().taskAttemptFailed(attemptID, shuffleJobTokenSecretManager, nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,10 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request)
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
if (request.uri() == null) {
sendError(ctx, BAD_REQUEST);
return;
}
// Check whether the shuffle version is compatible
if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
request.headers().get(ShuffleHeader.HTTP_HEADER_NAME))
Expand Down Expand Up @@ -1053,22 +1057,35 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request)
"\n dagId: " + dagIdQ +
"\n keepAlive: " + keepAliveParam);
}
// If the request is for Dag Deletion, process the request and send OK.
if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) {
boolean isDeleteRequest =
notEmptyAndContains(dagCompletedQ, "delete") || notEmptyAndContains(vertexCompletedQ, "delete")
|| notEmptyAndContains(taskAttemptFailedQ, "delete");
if (!isDeleteRequest && (mapIds == null || reduceRange == null) || jobQ == null || dagIdQ == null) {
sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
return;
}
if (deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) {
if (jobQ.size() != 1) {
sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
return;
}
if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) {
if (isDeleteRequest) {
try {
verifyRequest(jobQ.get(0), ctx, request, new DefaultHttpResponse(HTTP_1_1, OK),
URI.create("http://:" + this.port + request.uri()).toURL());
} catch (IOException e) {
LOG.warn("Shuffle delete request authentication failure ", e);
sendError(ctx, e.getMessage(), UNAUTHORIZED);
return;
}
}
// If the request is for Dag Deletion, process the request and send OK.
if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) {
return;
}
if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
if (deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) {
return;
}
if (jobQ.size() != 1) {
sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.tez.auxservices;

//import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
//import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
//import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -1353,6 +1350,12 @@ public void testDagDelete() throws Exception {
protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
// Do nothing.
}
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
Expand Down Expand Up @@ -1587,6 +1590,104 @@ public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() {
}
}

@Test(timeout = 10000)
public void testUnauthenticatedDeleteIsRejected() throws Exception {
Configuration conf = getInitialConf();
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"simple");
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath());
ApplicationId appId = ApplicationId.newInstance(12345, 1);
String appAttemptId = "attempt_12345_1_m_1_0";
String user = "randomUser";
List<File> fileMap = new ArrayList<File>();
createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId,
conf, fileMap);
// Use a real ShuffleHandler (no verifyRequest override) so authentication is enforced
ShuffleHandler shuffleHandler = new ShuffleHandler() {
private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
@Override
public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() {
return pathHandler;
}
};
shuffleHandler.init(conf);
try {
shuffleHandler.start();
DataOutputBuffer outputBuffer = new DataOutputBuffer();
outputBuffer.reset();
Token<JobTokenIdentifier> jt =
new Token<JobTokenIdentifier>("identifier".getBytes(),
"password".getBytes(), new Text(user), new Text("shuffleService"));
jt.write(outputBuffer);
shuffleHandler
.initializeApplication(new ApplicationInitializationContext(user,
appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
outputBuffer.getLength())));
String baseUrl = "http://127.0.0.1:"
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
String dagDirStr =
StringUtils.join(Path.SEPARATOR,
new String[] {TEST_DIR.getAbsolutePath(),
ShuffleHandler.USERCACHE, user,
ShuffleHandler.APPCACHE, appId.toString(), "dag_1/"});
File dagDir = new File(dagDirStr);
String taskAttemptDirStr =
StringUtils.join(Path.SEPARATOR,
new String[] {TEST_DIR.getAbsolutePath(),
ShuffleHandler.USERCACHE, user,
ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/",
appAttemptId});
File taskAttemptDir = new File(taskAttemptDirStr);

Assert.assertTrue("Dag Directory does not exist!", dagDir.exists());
HttpURLConnection conn = (HttpURLConnection) URI.create(baseUrl
+ "/mapOutput?dagAction=delete&job=job_12345_0001&dag=1").toURL().openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
Assert.assertEquals("Unauthenticated dag delete should return 401",
HttpURLConnection.HTTP_UNAUTHORIZED, conn.getResponseCode());
Assert.assertTrue(
"Dag Directory should NOT have been deleted after unauthenticated request",
dagDir.exists());

conn = (HttpURLConnection) URI.create(baseUrl
+ "/mapOutput?vertexAction=delete&job=job_12345_0001&dag=1&vertex=00").toURL().openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
Assert.assertEquals("Unauthenticated vertex delete should return 401",
HttpURLConnection.HTTP_UNAUTHORIZED, conn.getResponseCode());
Assert.assertTrue(
"Dag Directory should NOT have been deleted after unauthenticated vertex delete request",
dagDir.exists());

Assert.assertTrue("Task Attempt Directory does not exist!", taskAttemptDir.exists());
conn = (HttpURLConnection) URI.create(baseUrl
+ "/mapOutput?taskAttemptAction=delete&job=job_12345_0001&dag=1&map="
+ appAttemptId).toURL().openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
Assert.assertEquals("Unauthenticated task attempt delete should return 401",
HttpURLConnection.HTTP_UNAUTHORIZED, conn.getResponseCode());
Assert.assertTrue(
"Task Attempt Directory should NOT have been deleted after unauthenticated request",
taskAttemptDir.exists());
} finally {
shuffleHandler.close();
FileUtil.fullyDelete(TEST_DIR);
}
}

@Test(timeout = 4000)
public void testSendMapCount() throws Exception {
final List<ShuffleHandler.ReduceMapFileCount> listenerList =
Expand Down
Loading