From 7895280b49221eff595a0b0e95f757acfd8adbde Mon Sep 17 00:00:00 2001 From: Vinayak Date: Fri, 7 Oct 2016 19:23:28 +0530 Subject: [PATCH 1/4] initial commit --- .../history/ApplicationHistoryProvider.scala | 15 ++++++ .../deploy/history/FsHistoryProvider.scala | 51 +++++++++++++------ .../spark/deploy/history/HistoryPage.scala | 8 +++ .../spark/deploy/history/HistoryServer.scala | 4 ++ 4 files changed, 63 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 06530ff836466..3bc9e7ce33d68 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -74,6 +74,21 @@ private[history] case class LoadedAppUI( private[history] abstract class ApplicationHistoryProvider { + /** + * Returns the count of application event logs that the provider is currently still processing. + * History Server UI can use this to indicate to a user that the application listing on the UI + * can be expected to list additional known applications once the processing of these + * application event logs completes. + * + * A History Provider that does not have a notion of count of event logs that may be pending + * for processing need not override this method. + * + * @return Count of application event logs that are currently under process + */ + def getEventLogsUnderProcess(): Int = { + return 0; + } + /** * Returns a list of applications available for the history server to show. * diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 530cc5252214b..984e1104cf931 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history import java.io.{FileNotFoundException, IOException, OutputStream} import java.util.UUID -import java.util.concurrent.{Executors, ExecutorService, TimeUnit} +import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable @@ -123,6 +123,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger (0) + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -229,6 +231,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) applications.get(appId) } + override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get() + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { applications.get(appId).flatMap { appInfo => @@ -328,24 +332,41 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") } - logInfos.map { file => - replayExecutor.submit(new Runnable { + + var tasks = mutable.ListBuffer[Future[_]]() + + try { + for (file <- logInfos) { + tasks += replayExecutor.submit(new Runnable { override def run(): Unit = mergeApplicationListing(file) }) } - .foreach { task => - try { - // Wait for all tasks to finish. This makes sure that checkForLogs - // is not scheduled again while some tasks are already running in - // the replayExecutor. - task.get() - } catch { - case e: InterruptedException => - throw e - case e: Exception => - logError("Exception while merging application listings", e) - } + } catch { + // let the iteration over logInfos break, since an exception on + // replayExecutor.submit (..) indicates the ExecutorService is unable + // to take any more submissions at this time + + case e: Exception => + logError(s"Exception while submitting event log for replay", e) + } + + pendingReplayTasksCount.addAndGet (tasks.size) + + tasks.foreach { task => + try { + // Wait for all tasks to finish. This makes sure that checkForLogs + // is not scheduled again while some tasks are already running in + // the replayExecutor. + task.get() + } catch { + case e: InterruptedException => + throw e + case e: Exception => + logError("Exception while merging application listings", e) + } finally { + pendingReplayTasksCount.decrementAndGet() } + } lastScanTime = newLastScanTime } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 96b9ecf43b14c..494d0609ae8cb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -30,6 +30,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete) + val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess() val providerConfig = parent.getProviderConfig() val content =
@@ -38,6 +39,13 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {providerConfig.map { case (k, v) =>
  • {k}: {v}
  • }} { + if (eventLogsUnderProcessCount > 0) { +

    There are {eventLogsUnderProcessCount} event log(s) currently being + processed which may result in additional applications getting listed on this page. + Refresh the page to view updates.

    + } + } + { if (allAppsSize > 0) { ++ ++ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 3175b36b3e56f..d6c9eb955a719 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -179,6 +179,10 @@ class HistoryServer( provider.getListing() } + def getEventLogsUnderProcess(): Int = { + provider.getEventLogsUnderProcess() + } + def getApplicationInfoList: Iterator[ApplicationInfo] = { getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) } From f0ca08819659cbed2c2288a4b86d6cd1f528ecbd Mon Sep 17 00:00:00 2001 From: Vinayak Date: Mon, 17 Oct 2016 09:45:33 +0530 Subject: [PATCH 2/4] display last updated time on app list page --- .../resources/org/apache/spark/ui/static/historypage.js | 6 ++++++ .../deploy/history/ApplicationHistoryProvider.scala | 9 +++++++++ .../apache/spark/deploy/history/FsHistoryProvider.scala | 8 +++++--- .../org/apache/spark/deploy/history/HistoryPage.scala | 3 +++ .../org/apache/spark/deploy/history/HistoryServer.scala | 4 ++++ 5 files changed, 27 insertions(+), 3 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 2a32e18672a22..8abef13b3bb40 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -175,4 +175,10 @@ $(document).ready(function() { $('#hisotry-summary [data-toggle="tooltip"]').tooltip(); }); }); + + if ($('#last-updated').length) { + var lastUpdatedMillis = Number($('#last-updated').text()); + var updatedDate = new Date(lastUpdatedMillis); + $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString()) + } }); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 3bc9e7ce33d68..d7d82800b8b55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -89,6 +89,15 @@ private[history] abstract class ApplicationHistoryProvider { return 0; } + /** + * Returns the time the history provider last updated the application history information + * + * @return 0 if this is undefined or unsupported, otherwise the last updated time in millis + */ + def getLastUpdatedTime(): Long = { + return 0; + } + /** * Returns a list of applications available for the history server to show. * diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 984e1104cf931..a7f7b9472ce1c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -111,7 +111,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // The modification time of the newest log detected during the last scan. Currently only // used for logging msgs (logs are re-scanned based on file size, rather than modtime) - private var lastScanTime = -1L + private val lastScanTime = new java.util.concurrent.atomic.AtomicLong (-1) // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted // into the map in order, so the LinkedHashMap maintains the correct ordering. @@ -233,6 +233,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get() + override def getLastUpdatedTime(): Long = lastScanTime.get() + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { applications.get(appId).flatMap { appInfo => @@ -368,7 +370,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - lastScanTime = newLastScanTime + lastScanTime.set(newLastScanTime) } catch { case e: Exception => logError("Exception in checking for event log updates", e) } @@ -385,7 +387,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } catch { case e: Exception => logError("Exception encountered when attempting to update last scan time", e) - lastScanTime + lastScanTime.get() } finally { if (!fs.delete(path, true)) { logWarning(s"Error deleting ${path}") diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 494d0609ae8cb..de46b657a93b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -31,6 +31,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete) val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess() + val lastUpdatedTime = parent.getLastUpdatedTime() val providerConfig = parent.getProviderConfig() val content =
    @@ -43,6 +44,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")

    There are {eventLogsUnderProcessCount} event log(s) currently being processed which may result in additional applications getting listed on this page. Refresh the page to view updates.

    + } else if (lastUpdatedTime > 0) { +

    Last updated: {lastUpdatedTime}

    } } { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d6c9eb955a719..7e21fa681aa1e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -183,6 +183,10 @@ class HistoryServer( provider.getEventLogsUnderProcess() } + def getLastUpdatedTime(): Long = { + provider.getLastUpdatedTime() + } + def getApplicationInfoList: Iterator[ApplicationInfo] = { getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) } From 114444021ecb010400badcf5d783ae9266905e09 Mon Sep 17 00:00:00 2001 From: Vinayak Date: Mon, 24 Oct 2016 15:04:46 +0530 Subject: [PATCH 3/4] review comments #1 --- .../spark/ui/static/historypage-common.js | 24 +++++++++++++++++++ .../org/apache/spark/ui/static/historypage.js | 6 ----- .../spark/deploy/history/HistoryPage.scala | 10 +++++++- 3 files changed, 33 insertions(+), 7 deletions(-) create mode 100644 core/src/main/resources/org/apache/spark/ui/static/historypage-common.js diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js new file mode 100644 index 0000000000000..55d540d8317a0 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-common.js @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ready(function() { + if ($('#last-updated').length) { + var lastUpdatedMillis = Number($('#last-updated').text()); + var updatedDate = new Date(lastUpdatedMillis); + $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString()) + } +}); diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 8abef13b3bb40..2a32e18672a22 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -175,10 +175,4 @@ $(document).ready(function() { $('#hisotry-summary [data-toggle="tooltip"]').tooltip(); }); }); - - if ($('#last-updated').length) { - var lastUpdatedMillis = Number($('#last-updated').text()); - var updatedDate = new Date(lastUpdatedMillis); - $('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString()) - } }); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index de46b657a93b0..0e7a6c24d4fa5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -34,6 +34,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val lastUpdatedTime = parent.getLastUpdatedTime() val providerConfig = parent.getProviderConfig() val content = +
      @@ -44,10 +45,15 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")

      There are {eventLogsUnderProcessCount} event log(s) currently being processed which may result in additional applications getting listed on this page. Refresh the page to view updates.

      - } else if (lastUpdatedTime > 0) { + } + } + + { + if (lastUpdatedTime > 0) {

      Last updated: {lastUpdatedTime}

      } } + { if (allAppsSize > 0) { ++ @@ -57,6 +63,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") } else if (requestedIncomplete) {

      No incomplete applications found!

      + } else if (eventLogsUnderProcessCount > 0) { +

      No completed applications found!

      } else {

      No completed applications found!

      ++ parent.emptyListingHtml } From 596c7aec2c282a50702829c5de97dafd318f2fd8 Mon Sep 17 00:00:00 2001 From: Vinayak Date: Mon, 7 Nov 2016 23:09:05 +0530 Subject: [PATCH 4/4] review comments - 2 --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a7f7b9472ce1c..7c39a98618780 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -111,7 +111,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // The modification time of the newest log detected during the last scan. Currently only // used for logging msgs (logs are re-scanned based on file size, rather than modtime) - private val lastScanTime = new java.util.concurrent.atomic.AtomicLong (-1) + private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted // into the map in order, so the LinkedHashMap maintains the correct ordering. @@ -123,7 +123,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] - private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger (0) + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) /** * Return a runnable that performs the given operation on the event logs. @@ -352,7 +352,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logError(s"Exception while submitting event log for replay", e) } - pendingReplayTasksCount.addAndGet (tasks.size) + pendingReplayTasksCount.addAndGet(tasks.size) tasks.foreach { task => try {