diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index f8f43b4492..1ef4ccbc92 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -260,6 +260,7 @@ public class DAGAppMaster extends AbstractService { private TaskHeartbeatHandler taskHeartbeatHandler; private TaskCommunicatorManagerInterface taskCommunicatorManager; private JobTokenSecretManager jobTokenSecretManager; + private JobTokenSecretManager shuffleJobTokenSecretManager; private Token sessionToken; private DagEventDispatcher dagEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; @@ -528,6 +529,12 @@ protected void serviceInit(final Configuration conf) throws Exception { jobTokenSecretManager.addTokenForJob( appAttemptID.getApplicationId().toString(), sessionToken); + // Create a JobTokenSecretManager initialized with the session token's password + // for shuffle delete requests. The shuffle handler verifies requests using the + // token password, so the MAC must be initialized with it (as tasks do). + shuffleJobTokenSecretManager = new JobTokenSecretManager( + JobTokenSecretManager.createSecretKey(sessionToken.getPassword()), amConf); + //service to handle requests to TaskUmbilicalProtocol taskCommunicatorManager = createTaskCommunicatorManager(context, taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors); @@ -908,7 +915,7 @@ protected synchronized void handle(DAGAppMasterEvent event) { DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event; LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + cleanupEvent.getDag().getID()); - containerLauncherManager.dagComplete(cleanupEvent.getDag().getID(), jobTokenSecretManager); + containerLauncherManager.dagComplete(cleanupEvent.getDag().getID(), shuffleJobTokenSecretManager); taskCommunicatorManager.dagComplete(cleanupEvent.getDag()); nodes.dagComplete(cleanupEvent.getDag()); containers.dagComplete(cleanupEvent.getDag()); @@ -2744,10 +2751,10 @@ public String getWebUIAddress() { } public void vertexComplete(TezVertexID completedVertexID, Set nodesList) { - getContainerLauncherManager().vertexComplete(completedVertexID, jobTokenSecretManager, nodesList); + getContainerLauncherManager().vertexComplete(completedVertexID, shuffleJobTokenSecretManager, nodesList); } public void taskAttemptFailed(TezTaskAttemptID attemptID, NodeId nodeId) { - getContainerLauncherManager().taskAttemptFailed(attemptID, jobTokenSecretManager, nodeId); + getContainerLauncherManager().taskAttemptFailed(attemptID, shuffleJobTokenSecretManager, nodeId); } } diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 60aa088be2..30cb71f0ad 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -1022,6 +1022,10 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) sendError(ctx, METHOD_NOT_ALLOWED); return; } + if (request.uri() == null) { + sendError(ctx, BAD_REQUEST); + return; + } // Check whether the shuffle version is compatible if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( request.headers().get(ShuffleHeader.HTTP_HEADER_NAME)) @@ -1053,22 +1057,35 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) "\n dagId: " + dagIdQ + "\n keepAlive: " + keepAliveParam); } - // If the request is for Dag Deletion, process the request and send OK. - if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) { + boolean isDeleteRequest = + notEmptyAndContains(dagCompletedQ, "delete") || notEmptyAndContains(vertexCompletedQ, "delete") + || notEmptyAndContains(taskAttemptFailedQ, "delete"); + if (!isDeleteRequest && (mapIds == null || reduceRange == null) || jobQ == null || dagIdQ == null) { + sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST); return; } - if (deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) { + if (jobQ.size() != 1) { + sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST); return; } - if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) { + if (isDeleteRequest) { + try { + verifyRequest(jobQ.get(0), ctx, request, new DefaultHttpResponse(HTTP_1_1, OK), + URI.create("http://:" + this.port + request.uri()).toURL()); + } catch (IOException e) { + LOG.warn("Shuffle delete request authentication failure ", e); + sendError(ctx, e.getMessage(), UNAUTHORIZED); + return; + } + } + // If the request is for Dag Deletion, process the request and send OK. + if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) { return; } - if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) { - sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST); + if (deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) { return; } - if (jobQ.size() != 1) { - sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST); + if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) { return; } diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index a139446b57..081d44f491 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -18,9 +18,6 @@ */ package org.apache.tez.auxservices; -//import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -//import static org.apache.hadoop.test.MetricsAsserts.assertGauge; -//import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static io.netty.buffer.Unpooled.wrappedBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -1353,6 +1350,12 @@ public void testDagDelete() throws Exception { protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + // Do nothing. + } @Override protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { @@ -1587,6 +1590,104 @@ public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { } } + @Test(timeout = 10000) + public void testUnauthenticatedDeleteIsRejected() throws Exception { + Configuration conf = getInitialConf(); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "simple"); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TEST_DIR.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + List fileMap = new ArrayList(); + createShuffleHandlerFiles(TEST_DIR, user, appId.toString(), appAttemptId, + conf, fileMap); + // Use a real ShuffleHandler (no verifyRequest override) so authentication is enforced + ShuffleHandler shuffleHandler = new ShuffleHandler() { + private AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = + new Token("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + String baseUrl = "http://127.0.0.1:" + + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); + String dagDirStr = + StringUtils.join(Path.SEPARATOR, + new String[] {TEST_DIR.getAbsolutePath(), + ShuffleHandler.USERCACHE, user, + ShuffleHandler.APPCACHE, appId.toString(), "dag_1/"}); + File dagDir = new File(dagDirStr); + String taskAttemptDirStr = + StringUtils.join(Path.SEPARATOR, + new String[] {TEST_DIR.getAbsolutePath(), + ShuffleHandler.USERCACHE, user, + ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/", + appAttemptId}); + File taskAttemptDir = new File(taskAttemptDirStr); + + Assert.assertTrue("Dag Directory does not exist!", dagDir.exists()); + HttpURLConnection conn = (HttpURLConnection) URI.create(baseUrl + + "/mapOutput?dagAction=delete&job=job_12345_0001&dag=1").toURL().openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + Assert.assertEquals("Unauthenticated dag delete should return 401", + HttpURLConnection.HTTP_UNAUTHORIZED, conn.getResponseCode()); + Assert.assertTrue( + "Dag Directory should NOT have been deleted after unauthenticated request", + dagDir.exists()); + + conn = (HttpURLConnection) URI.create(baseUrl + + "/mapOutput?vertexAction=delete&job=job_12345_0001&dag=1&vertex=00").toURL().openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + Assert.assertEquals("Unauthenticated vertex delete should return 401", + HttpURLConnection.HTTP_UNAUTHORIZED, conn.getResponseCode()); + Assert.assertTrue( + "Dag Directory should NOT have been deleted after unauthenticated vertex delete request", + dagDir.exists()); + + Assert.assertTrue("Task Attempt Directory does not exist!", taskAttemptDir.exists()); + conn = (HttpURLConnection) URI.create(baseUrl + + "/mapOutput?taskAttemptAction=delete&job=job_12345_0001&dag=1&map=" + + appAttemptId).toURL().openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + Assert.assertEquals("Unauthenticated task attempt delete should return 401", + HttpURLConnection.HTTP_UNAUTHORIZED, conn.getResponseCode()); + Assert.assertTrue( + "Task Attempt Directory should NOT have been deleted after unauthenticated request", + taskAttemptDir.exists()); + } finally { + shuffleHandler.close(); + FileUtil.fullyDelete(TEST_DIR); + } + } + @Test(timeout = 4000) public void testSendMapCount() throws Exception { final List listenerList =