From 21cb506117271a99e3e7e2865e5774ad485c217a Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 27 Jan 2016 16:35:50 -0800 Subject: [PATCH 1/6] [SPARK-12299] Removed history server functionality from Master with related WebUI and JSON ApiRootResources --- .../spark/deploy/master/ApplicationInfo.scala | 9 -- .../apache/spark/deploy/master/Master.scala | 110 +----------------- .../spark/deploy/master/MasterMessages.scala | 2 - .../deploy/master/ui/ApplicationPage.scala | 6 +- .../master/ui/HistoryNotFoundPage.scala | 73 ------------ .../spark/deploy/master/ui/MasterPage.scala | 8 +- .../spark/deploy/master/ui/MasterWebUI.scala | 35 +----- .../spark/status/api/v1/ApiRootResource.scala | 2 +- .../api/v1/ApplicationListResource.scala | 23 ---- 9 files changed, 15 insertions(+), 253 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 7e2cf956c7253..a28cb19edef6a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -41,7 +41,6 @@ private[spark] class ApplicationInfo( @transient var coresGranted: Int = _ @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ - @transient @volatile var appUIUrlAtHistoryServer: Option[String] = None // A cap on the number of executors this application can have at any given time. // By default, this is infinite. Only after the first allocation request is issued by the @@ -66,7 +65,6 @@ private[spark] class ApplicationInfo( nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] executorLimit = Integer.MAX_VALUE - appUIUrlAtHistoryServer = None } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -136,11 +134,4 @@ private[spark] class ApplicationInfo( System.currentTimeMillis() - startTime } } - - /** - * Returns the original application UI url unless there is its address at history server - * is defined - */ - def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 202a1b787c21b..a90ac96d48747 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -17,34 +17,24 @@ package org.apache.spark.deploy.master -import java.io.FileNotFoundException -import java.net.URLEncoder import java.text.SimpleDateFormat import java.util.Date -import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration.Duration -import scala.language.postfixOps import scala.util.Random -import org.apache.hadoop.fs.Path - import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.deploy.rest.StandaloneRestServer import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc._ -import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.serializer.{JavaSerializer, Serializer} -import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ThreadUtils, Utils} private[deploy] class Master( @@ -58,10 +48,6 @@ private[deploy] class Master( private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") - private val rebuildUIThread = - ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread") - private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread) - private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs @@ -84,8 +70,6 @@ private[deploy] class Master( private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] private val completedApps = new ArrayBuffer[ApplicationInfo] private var nextAppNumber = 0 - // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI - private val appIdToUI = new ConcurrentHashMap[String, SparkUI] private val drivers = new HashSet[DriverInfo] private val completedDrivers = new ArrayBuffer[DriverInfo] @@ -198,7 +182,6 @@ private[deploy] class Master( checkForWorkerTimeOutTask.cancel(true) } forwardMessageThread.shutdownNow() - rebuildUIThread.shutdownNow() webUi.stop() restServer.foreach(_.stop()) masterMetricsSystem.stop() @@ -375,10 +358,6 @@ private[deploy] class Master( case CheckForWorkerTimeOut => { timeOutDeadWorkers() } - - case AttachCompletedRebuildUI(appId) => - // An asyncRebuildSparkUI has completed, so need to attach to master webUi - Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -821,7 +800,6 @@ private[deploy] class Master( if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach( a => { - Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) } applicationMetricsSystem.removeSource(a.appSource) }) completedApps.trimStart(toRemove) @@ -829,9 +807,6 @@ private[deploy] class Master( completedApps += app // Remember it in our history waitingApps -= app - // If application events are logged, use them to rebuild the UI - asyncRebuildSparkUI(app) - for (exec <- app.executors.values) { killExecutor(exec) } @@ -930,89 +905,6 @@ private[deploy] class Master( exec.state = ExecutorState.KILLED } - /** - * Rebuild a new SparkUI from the given application's event logs. - * Return the UI if successful, else None - */ - private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = { - val futureUI = asyncRebuildSparkUI(app) - Await.result(futureUI, Duration.Inf) - } - - /** Rebuild a new SparkUI asynchronously to not block RPC event loop */ - private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = { - val appName = app.desc.name - val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found" - val eventLogDir = app.desc.eventLogDir - .getOrElse { - // Event logging is disabled for this application - app.appUIUrlAtHistoryServer = Some(notFoundBasePath) - return Future.successful(None) - } - val futureUI = Future { - val eventLogFilePrefix = EventLoggingListener.getLogPath( - eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec) - val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf) - val inProgressExists = fs.exists(new Path(eventLogFilePrefix + - EventLoggingListener.IN_PROGRESS)) - - val eventLogFile = if (inProgressExists) { - // Event logging is enabled for this application, but the application is still in progress - logWarning(s"Application $appName is still in progress, it may be terminated abnormally.") - eventLogFilePrefix + EventLoggingListener.IN_PROGRESS - } else { - eventLogFilePrefix - } - - val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) - val replayBus = new ReplayListenerBus() - val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), - appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime) - try { - replayBus.replay(logInput, eventLogFile, inProgressExists) - } finally { - logInput.close() - } - - Some(ui) - }(rebuildUIContext) - - futureUI.onSuccess { case Some(ui) => - appIdToUI.put(app.id, ui) - // `self` can be null if we are already in the process of shutting down - // This happens frequently in tests where `local-cluster` is used - if (self != null) { - self.send(AttachCompletedRebuildUI(app.id)) - } - // Application UI is successfully rebuilt, so link the Master UI to it - // NOTE - app.appUIUrlAtHistoryServer is volatile - app.appUIUrlAtHistoryServer = Some(ui.basePath) - }(ThreadUtils.sameThread) - - futureUI.onFailure { - case fnf: FileNotFoundException => - // Event logging is enabled for this application, but no event logs are found - val title = s"Application history not found (${app.id})" - var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}." - logWarning(msg) - msg += " Did you specify the correct logging directory?" - msg = URLEncoder.encode(msg, "UTF-8") - app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title") - - case e: Exception => - // Relay exception message to application UI page - val title = s"Application history load error (${app.id})" - val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8") - var msg = s"Exception in replaying log for application $appName!" - logError(msg, e) - msg = URLEncoder.encode(msg, "UTF-8") - app.appUIUrlAtHistoryServer = - Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title") - }(ThreadUtils.sameThread) - - futureUI - } - /** Generate a new app ID given a app's submission date */ private def newApplicationId(submitDate: Date): String = { val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index a055d097674ce..a952cee36eb44 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -39,6 +39,4 @@ private[master] object MasterMessages { case object BoundPortsRequest case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int]) - - case class AttachCompletedRebuildUI(appId: String) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 1b18cf0ded69d..56443fa3adb09 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -76,7 +76,11 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
  • Submit Date: {app.submitDate}
  • State: {app.state}
  • -
  • Application Detail UI
  • + { + if (!app.isFinished) { +
  • Application Detail UI
  • + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala deleted file mode 100644 index e021f1eef794f..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master.ui - -import java.net.URLDecoder -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[ui] class HistoryNotFoundPage(parent: MasterWebUI) - extends WebUIPage("history/not-found") { - - /** - * Render a page that conveys failure in loading application history. - * - * This accepts 3 HTTP parameters: - * msg = message to display to the user - * title = title of the page - * exception = detailed description of the exception in loading application history (if any) - * - * Parameters "msg" and "exception" are assumed to be UTF-8 encoded. - */ - def render(request: HttpServletRequest): Seq[Node] = { - val titleParam = request.getParameter("title") - val msgParam = request.getParameter("msg") - val exceptionParam = request.getParameter("exception") - - // If no parameters are specified, assume the user did not enable event logging - val defaultTitle = "Event logging is not enabled" - val defaultContent = -
    -
    - No event logs were found for this application! To - enable event logging, - set spark.eventLog.enabled to true and - spark.eventLog.dir to the directory to which your - event logs are written. -
    -
    - - val title = Option(titleParam).getOrElse(defaultTitle) - val content = Option(msgParam) - .map { msg => URLDecoder.decode(msg, "UTF-8") } - .map { msg => -
    -
    {msg}
    -
    ++ - Option(exceptionParam) - .map { e => URLDecoder.decode(e, "UTF-8") } - .map { e =>
    {e}
    } - .getOrElse(Seq.empty) - }.getOrElse(defaultContent) - - UIUtils.basicSparkPage(content, title) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index f9b0279c3d1e3..81dcad747bf5d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -206,7 +206,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {killLink} - {app.desc.name} + { + if (app.isFinished) { + app.desc.name + } else { + {app.desc.name} + } + } {app.coresGranted} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index d7543926f3850..5feccf09757ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy.master.ui import org.apache.spark.Logging import org.apache.spark.deploy.master.Master -import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, - UIRoot} import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ @@ -33,7 +31,7 @@ class MasterWebUI( requestedPort: Int, customMasterPage: Option[MasterPage] = None) extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"), - requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot { + requestedPort, master.conf, name = "MasterUI") with Logging { val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) @@ -46,44 +44,13 @@ class MasterWebUI( def initialize() { val masterPage = new MasterPage(this) attachPage(new ApplicationPage(this)) - attachPage(new HistoryNotFoundPage(this)) attachPage(masterPage) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) - attachHandler(ApiRootResource.getServletHandler(this)) attachHandler(createRedirectHandler( "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) attachHandler(createRedirectHandler( "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) } - - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ - def attachSparkUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") - ui.getHandlers.foreach(attachHandler) - } - - /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ - def detachSparkUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") - ui.getHandlers.foreach(detachHandler) - } - - def getApplicationInfoList: Iterator[ApplicationInfo] = { - val state = masterPage.getMasterState - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val completedApps = state.completedApps.sortBy(_.endTime).reverse - activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++ - completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) } - } - - def getSparkUI(appId: String): Option[SparkUI] = { - val state = masterPage.getMasterState - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val completedApps = state.completedApps.sortBy(_.endTime).reverse - (activeApps ++ completedApps).find { _.id == appId }.flatMap { - master.rebuildSparkUI - } - } } private[master] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index 50b6ba67e9931..f615d08eddeb0 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -199,7 +199,7 @@ private[spark] object ApiRootResource { /** * This trait is shared by the all the root containers for application UI information -- - * the HistoryServer, the Master UI, and the application UI. This provides the common + * the HistoryServer and the application UI. This provides the common * interface needed for them all to expose application info as json. */ private[spark] trait UIRoot { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 0fc0fb59d861f..f7e1a58079ff3 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -21,7 +21,6 @@ import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam} import javax.ws.rs.core.MediaType import org.apache.spark.deploy.history.ApplicationHistoryInfo -import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo} @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class ApplicationListResource(uiRoot: UIRoot) { @@ -77,26 +76,4 @@ private[spark] object ApplicationsListResource { } ) } - - def convertApplicationInfo( - internal: InternalApplicationInfo, - completed: Boolean): ApplicationInfo = { - // standalone application info always has just one attempt - new ApplicationInfo( - id = internal.id, - name = internal.desc.name, - coresGranted = Some(internal.coresGranted), - maxCores = internal.desc.maxCores, - coresPerExecutor = internal.desc.coresPerExecutor, - memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB), - attempts = Seq(new ApplicationAttemptInfo( - attemptId = None, - startTime = new Date(internal.startTime), - endTime = new Date(internal.endTime), - sparkUser = internal.desc.user, - completed = completed - )) - ) - } - } From 6918f0bfa4cc44517a48c20d2b5cbd127542a6f9 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 29 Jan 2016 14:40:21 -0800 Subject: [PATCH 2/6] Added UIRoot redirection from Master to Application --- .../spark/deploy/master/ui/MasterWebUI.scala | 67 +++++++++++++++++++ .../api/v1/ApplicationListResource.scala | 23 +++++++ 2 files changed, 90 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 5feccf09757ee..ae9cdcaad5454 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,8 +17,15 @@ package org.apache.spark.deploy.master.ui +import java.net.URL +import java.util.regex.Pattern +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} + +import org.eclipse.jetty.servlet.ServletContextHandler + import org.apache.spark.Logging import org.apache.spark.deploy.master.Master +import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo} import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ @@ -50,6 +57,66 @@ class MasterWebUI( "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) attachHandler(createRedirectHandler( "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) + attachHandler(createApiRootHandler()) + } + + def createApiRootHandler(): ServletContextHandler = { + + val servlet = new HttpServlet { + private lazy val appIdPattern = Pattern.compile("\\/api\\/v\\d+\\/applications\\/([^\\/]+).*") + + override def doGet(request: HttpServletRequest, response: HttpServletResponse): Unit = { + doRequest(request, response) + } + override def doPost(request: HttpServletRequest, response: HttpServletResponse): Unit = { + doRequest(request, response) + } + private def doRequest(request: HttpServletRequest, response: HttpServletResponse): Unit = { + val requestURI = request.getRequestURI + + // requesting an application info list + if (requestURI == "applications") { + // TODO - Should send ApplicationList response ??? + } else { + // forward request to app if it is active, otherwise send error + getAppId(request) match { + case Some(appId) => + val state = masterPage.getMasterState + state.activeApps.find(appInfo => appInfo.id == appId) match { + case Some(appInfo) => + val prefixedDestPath = appInfo.desc.appUiUrl + requestURI + val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString + response.sendRedirect(newUrl) + request.getPathInfo + case None => + response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE) + } + case None => + response.sendError(HttpServletResponse.SC_BAD_REQUEST) + } + } + } + + private def getAppId(request: HttpServletRequest): Option[String] = { + val m = appIdPattern.matcher(request.getRequestURI) + if (m.matches) Some(m.group(1)) else None + } + + // SPARK-5983 ensure TRACE is not supported + protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = { + res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) + } + } + + createServletHandler("/api", servlet, "") + } + + def getApplicationInfoList: Iterator[ApplicationInfo] = { + val state = masterPage.getMasterState + val activeApps = state.activeApps.sortBy(_.startTime).reverse + val completedApps = state.completedApps.sortBy(_.endTime).reverse + activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++ + completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index f7e1a58079ff3..0fc0fb59d861f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -21,6 +21,7 @@ import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam} import javax.ws.rs.core.MediaType import org.apache.spark.deploy.history.ApplicationHistoryInfo +import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo} @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class ApplicationListResource(uiRoot: UIRoot) { @@ -76,4 +77,26 @@ private[spark] object ApplicationsListResource { } ) } + + def convertApplicationInfo( + internal: InternalApplicationInfo, + completed: Boolean): ApplicationInfo = { + // standalone application info always has just one attempt + new ApplicationInfo( + id = internal.id, + name = internal.desc.name, + coresGranted = Some(internal.coresGranted), + maxCores = internal.desc.maxCores, + coresPerExecutor = internal.desc.coresPerExecutor, + memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB), + attempts = Seq(new ApplicationAttemptInfo( + attemptId = None, + startTime = new Date(internal.startTime), + endTime = new Date(internal.endTime), + sparkUser = internal.desc.user, + completed = completed + )) + ) + } + } From ce655d870c1909630ed443976b68339ceca4e888 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 6 Apr 2016 11:34:00 -0700 Subject: [PATCH 3/6] Revert "Added UIRoot redirection from Master to Application" This reverts commit 6918f0bfa4cc44517a48c20d2b5cbd127542a6f9. --- .../spark/deploy/master/ui/MasterWebUI.scala | 67 ------------------- .../api/v1/ApplicationListResource.scala | 23 ------- 2 files changed, 90 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index ae9cdcaad5454..5feccf09757ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,15 +17,8 @@ package org.apache.spark.deploy.master.ui -import java.net.URL -import java.util.regex.Pattern -import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} - -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.Logging import org.apache.spark.deploy.master.Master -import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo} import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ @@ -57,66 +50,6 @@ class MasterWebUI( "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) attachHandler(createRedirectHandler( "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) - attachHandler(createApiRootHandler()) - } - - def createApiRootHandler(): ServletContextHandler = { - - val servlet = new HttpServlet { - private lazy val appIdPattern = Pattern.compile("\\/api\\/v\\d+\\/applications\\/([^\\/]+).*") - - override def doGet(request: HttpServletRequest, response: HttpServletResponse): Unit = { - doRequest(request, response) - } - override def doPost(request: HttpServletRequest, response: HttpServletResponse): Unit = { - doRequest(request, response) - } - private def doRequest(request: HttpServletRequest, response: HttpServletResponse): Unit = { - val requestURI = request.getRequestURI - - // requesting an application info list - if (requestURI == "applications") { - // TODO - Should send ApplicationList response ??? - } else { - // forward request to app if it is active, otherwise send error - getAppId(request) match { - case Some(appId) => - val state = masterPage.getMasterState - state.activeApps.find(appInfo => appInfo.id == appId) match { - case Some(appInfo) => - val prefixedDestPath = appInfo.desc.appUiUrl + requestURI - val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString - response.sendRedirect(newUrl) - request.getPathInfo - case None => - response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE) - } - case None => - response.sendError(HttpServletResponse.SC_BAD_REQUEST) - } - } - } - - private def getAppId(request: HttpServletRequest): Option[String] = { - val m = appIdPattern.matcher(request.getRequestURI) - if (m.matches) Some(m.group(1)) else None - } - - // SPARK-5983 ensure TRACE is not supported - protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = { - res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) - } - } - - createServletHandler("/api", servlet, "") - } - - def getApplicationInfoList: Iterator[ApplicationInfo] = { - val state = masterPage.getMasterState - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val completedApps = state.completedApps.sortBy(_.endTime).reverse - activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++ - completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 0fc0fb59d861f..f7e1a58079ff3 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -21,7 +21,6 @@ import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam} import javax.ws.rs.core.MediaType import org.apache.spark.deploy.history.ApplicationHistoryInfo -import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo} @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class ApplicationListResource(uiRoot: UIRoot) { @@ -77,26 +76,4 @@ private[spark] object ApplicationsListResource { } ) } - - def convertApplicationInfo( - internal: InternalApplicationInfo, - completed: Boolean): ApplicationInfo = { - // standalone application info always has just one attempt - new ApplicationInfo( - id = internal.id, - name = internal.desc.name, - coresGranted = Some(internal.coresGranted), - maxCores = internal.desc.maxCores, - coresPerExecutor = internal.desc.coresPerExecutor, - memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB), - attempts = Seq(new ApplicationAttemptInfo( - attemptId = None, - startTime = new Date(internal.startTime), - endTime = new Date(internal.endTime), - sparkUser = internal.desc.user, - completed = completed - )) - ) - } - } From d9158501540f85135cbb046a834ddff15c3619e3 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 7 Apr 2016 15:34:10 -0700 Subject: [PATCH 4/6] changed MasterWebUISuite to test killing of driver and application instead of listing applications, which is no longer supported --- .../deploy/master/ui/MasterWebUISuite.scala | 118 +++++++++++------- 1 file changed, 70 insertions(+), 48 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala index 0c9382a92bcaf..69a460fbc7dba 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala @@ -17,74 +17,96 @@ package org.apache.spark.deploy.master.ui +import java.io.DataOutputStream +import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets import java.util.Date -import scala.io.Source -import scala.language.postfixOps +import scala.collection.mutable.HashMap -import org.json4s.jackson.JsonMethods._ -import org.json4s.JsonAST.{JInt, JNothing, JString} -import org.mockito.Mockito.{mock, when} -import org.scalatest.BeforeAndAfter +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.DeployMessages.MasterStateResponse +import org.apache.spark.deploy.DeployMessages.{KillDriverResponse, RequestKillDriver} import org.apache.spark.deploy.DeployTestUtils._ import org.apache.spark.deploy.master._ -import org.apache.spark.rpc.RpcEnv +import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv} -class MasterWebUISuite extends SparkFunSuite with BeforeAndAfter { +class MasterWebUISuite extends SparkFunSuite with BeforeAndAfterAll { - val masterPage = mock(classOf[MasterPage]) - val master = { - val conf = new SparkConf - val securityMgr = new SecurityManager(conf) - val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr) - val master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf) - master - } - val masterWebUI = new MasterWebUI(master, 0, customMasterPage = Some(masterPage)) + val conf = new SparkConf + val securityMgr = new SecurityManager(conf) + val rpcEnv = mock(classOf[RpcEnv]) + val master = mock(classOf[Master]) + val masterEndpointRef = mock(classOf[RpcEndpointRef]) + when(master.securityMgr).thenReturn(securityMgr) + when(master.conf).thenReturn(conf) + when(master.rpcEnv).thenReturn(rpcEnv) + when(master.self).thenReturn(masterEndpointRef) + val masterWebUI = new MasterWebUI(master, 0) - before { + override def beforeAll() { + super.beforeAll() masterWebUI.bind() } - after { + override def afterAll() { masterWebUI.stop() + super.afterAll() } - test("list applications") { - val worker = createWorkerInfo() + test("kill application") { val appDesc = createAppDesc() // use new start date so it isn't filtered by UI val activeApp = new ApplicationInfo( - new Date().getTime, "id", appDesc, new Date(), null, Int.MaxValue) - activeApp.addExecutor(worker, 2) - - val workers = Array[WorkerInfo](worker) - val activeApps = Array(activeApp) - val completedApps = Array[ApplicationInfo]() - val activeDrivers = Array[DriverInfo]() - val completedDrivers = Array[DriverInfo]() - val stateResponse = new MasterStateResponse( - "host", 8080, None, workers, activeApps, completedApps, - activeDrivers, completedDrivers, RecoveryState.ALIVE) - - when(masterPage.getMasterState).thenReturn(stateResponse) - - val resultJson = Source.fromURL( - s"http://localhost:${masterWebUI.boundPort}/api/v1/applications") - .mkString - val parsedJson = parse(resultJson) - val firstApp = parsedJson(0) - - assert(firstApp \ "id" === JString(activeApp.id)) - assert(firstApp \ "name" === JString(activeApp.desc.name)) - assert(firstApp \ "coresGranted" === JInt(2)) - assert(firstApp \ "maxCores" === JInt(4)) - assert(firstApp \ "memoryPerExecutorMB" === JInt(1234)) - assert(firstApp \ "coresPerExecutor" === JNothing) + new Date().getTime, "app-0", appDesc, new Date(), null, Int.MaxValue) + + when(master.idToApp).thenReturn(HashMap[String, ApplicationInfo]((activeApp.id, activeApp))) + + val url = s"http://localhost:${masterWebUI.boundPort}/app/kill/" + val body = convPostDataToString(Map(("id", activeApp.id), ("terminate", "true"))) + val conn = sendHttpRequest(url, "POST", body) + conn.getResponseCode + + // Verify the master was called to remove the active app + verify(master, times(1)).removeApplication(activeApp, ApplicationState.KILLED) + } + + test("kill driver") { + val activeDriverId = "driver-0" + val url = s"http://localhost:${masterWebUI.boundPort}/driver/kill/" + val body = convPostDataToString(Map(("id", activeDriverId), ("terminate", "true"))) + val conn = sendHttpRequest(url, "POST", body) + conn.getResponseCode + + // Verify that master was asked to kill driver with the correct id + verify(masterEndpointRef, times(1)).ask[KillDriverResponse](RequestKillDriver(activeDriverId)) } + private def convPostDataToString(data: Map[String, String]): String = { + (for ((name, value) <- data) yield s"$name=$value").mkString("&") + } + + /** + * Send an HTTP request to the given URL using the method and the body specified. + * Return the connection object. + */ + private def sendHttpRequest( + url: String, + method: String, + body: String = ""): HttpURLConnection = { + val conn = new URL(url).openConnection().asInstanceOf[HttpURLConnection] + conn.setRequestMethod(method) + if (body.nonEmpty) { + conn.setDoOutput(true) + conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded") + conn.setRequestProperty("Content-Length", Integer.toString(body.length)) + val out = new DataOutputStream(conn.getOutputStream) + out.write(body.getBytes(StandardCharsets.UTF_8)) + out.close() + } + conn + } } From ba8b9622f4db676bed090f71b9864a8337926a46 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 7 Apr 2016 15:37:17 -0700 Subject: [PATCH 5/6] removed custom MasterPage from MasterWebUI, no longer needed for testing --- .../org/apache/spark/deploy/master/ui/MasterWebUI.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index c93e4ec973fc1..a0727ad83fb66 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -28,16 +28,13 @@ import org.apache.spark.ui.JettyUtils._ private[master] class MasterWebUI( val master: Master, - requestedPort: Int, - customMasterPage: Option[MasterPage] = None) + requestedPort: Int) extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"), requestedPort, master.conf, name = "MasterUI") with Logging { val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) - val masterPage = customMasterPage.getOrElse(new MasterPage(this)) - initialize() /** Initialize all components of the server. */ From fadd392538f6efa37eebe3bd366ca76cbc07eb23 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 7 Apr 2016 15:44:51 -0700 Subject: [PATCH 6/6] updated docs to remove references to Master history serving --- docs/monitoring.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 32d2e02e93eeb..c3bfe0ab1fa89 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -27,11 +27,6 @@ in the UI to persisted storage. ## Viewing After the Fact -Spark's Standalone Mode cluster manager also has its own -[web UI](spark-standalone.html#monitoring-and-logging). If an application has logged events over -the course of its lifetime, then the Standalone master's web UI will automatically re-render the -application's UI after the application has finished. - If Spark is run on Mesos or YARN, it is still possible to construct the UI of an application through Spark's history server, provided that the application's event logs exist. You can start the history server by executing: