From fbf1a65ce7c3f7b9020d4a22031febafd331b526 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 29 Sep 2016 13:51:37 -0700 Subject: [PATCH 1/3] changed implementation of HistoryServer.getApplicationInfoList for lazy evaluation --- .../spark/deploy/history/HistoryServer.scala | 12 ++++- .../api/v1/ApplicationListResource.scala | 47 +++++++++++++------ 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 735aa43cfc994..dd4a13b09e92f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -179,7 +179,17 @@ class HistoryServer( } def getApplicationInfoList: Iterator[ApplicationInfo] = { - getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) + new Iterator[ApplicationInfo] { + private val appIter = getApplicationList().iterator + + override def hasNext: Boolean = { + appIter.hasNext + } + + override def next(): ApplicationInfo = { + ApplicationsListResource.appHistoryInfoToPublicAppInfo(appIter.next) + } + } } def getApplicationInfo(appId: String): Option[ApplicationInfo] = { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 075b9ba37dc84..6e617e3f2880e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -42,22 +42,39 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) { } val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED) val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING) - val appList = allApps.filter { app => - val anyRunning = app.attempts.exists(!_.completed) - // if any attempt is still running, we consider the app to also still be running - val statusOk = (!anyRunning && includeCompleted) || - (anyRunning && includeRunning) - // keep the app if *any* attempts fall in the right time window - val dateOk = app.attempts.exists { attempt => - attempt.startTime.getTime >= minDate.timestamp && - attempt.startTime.getTime <= maxDate.timestamp + val numApps = Option(limit).getOrElse(Integer.MAX_VALUE).asInstanceOf[Int] + + new Iterator[ApplicationInfo] { + private var appCount = 0 + private var nextApp: Option[ApplicationInfo] = None + + override def hasNext: Boolean = { + nextApp = None + + while (nextApp == None && appCount < numApps && allApps.hasNext) { + val app = allApps.next + val anyRunning = app.attempts.exists(!_.completed) + // if any attempt is still running, we consider the app to also still be running + val statusOk = (!anyRunning && includeCompleted) || + (anyRunning && includeRunning) + // keep the app if *any* attempts fall in the right time window + val dateOk = app.attempts.exists { attempt => + attempt.startTime.getTime >= minDate.timestamp && + attempt.startTime.getTime <= maxDate.timestamp + } + + if (statusOk && dateOk) { + nextApp = Some(app) + appCount += 1 + } + } + + nextApp != None + } + + override def next(): ApplicationInfo = { + nextApp.get } - statusOk && dateOk - } - if (limit != null) { - appList.take(limit) - } else { - appList } } } From dc8146e7c9cb7d834340661cfb650f8997c2eab5 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 30 Sep 2016 13:01:27 -0700 Subject: [PATCH 2/3] directly operator on iterator --- .../spark/deploy/history/HistoryServer.scala | 12 +---- .../api/v1/ApplicationListResource.scala | 46 ++++++------------- 2 files changed, 14 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index dd4a13b09e92f..735aa43cfc994 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -179,17 +179,7 @@ class HistoryServer( } def getApplicationInfoList: Iterator[ApplicationInfo] = { - new Iterator[ApplicationInfo] { - private val appIter = getApplicationList().iterator - - override def hasNext: Boolean = { - appIter.hasNext - } - - override def next(): ApplicationInfo = { - ApplicationsListResource.appHistoryInfoToPublicAppInfo(appIter.next) - } - } + getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) } def getApplicationInfo(appId: String): Option[ApplicationInfo] = { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 6e617e3f2880e..aaa26af8d6e55 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -40,42 +40,22 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) { status } } + + val numApps = Option(limit).getOrElse(Integer.MAX_VALUE).asInstanceOf[Int] val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED) val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING) - val numApps = Option(limit).getOrElse(Integer.MAX_VALUE).asInstanceOf[Int] - - new Iterator[ApplicationInfo] { - private var appCount = 0 - private var nextApp: Option[ApplicationInfo] = None - - override def hasNext: Boolean = { - nextApp = None - - while (nextApp == None && appCount < numApps && allApps.hasNext) { - val app = allApps.next - val anyRunning = app.attempts.exists(!_.completed) - // if any attempt is still running, we consider the app to also still be running - val statusOk = (!anyRunning && includeCompleted) || - (anyRunning && includeRunning) - // keep the app if *any* attempts fall in the right time window - val dateOk = app.attempts.exists { attempt => - attempt.startTime.getTime >= minDate.timestamp && - attempt.startTime.getTime <= maxDate.timestamp - } - - if (statusOk && dateOk) { - nextApp = Some(app) - appCount += 1 - } - } - - nextApp != None + allApps.filter { app => + val anyRunning = app.attempts.exists(!_.completed) + // if any attempt is still running, we consider the app to also still be running + val statusOk = (!anyRunning && includeCompleted) || + (anyRunning && includeRunning) + // keep the app if *any* attempts fall in the right time window + val dateOk = app.attempts.exists { attempt => + attempt.startTime.getTime >= minDate.timestamp && + attempt.startTime.getTime <= maxDate.timestamp } - - override def next(): ApplicationInfo = { - nextApp.get - } - } + statusOk && dateOk + }.take(numApps) } } From 955154b470c3d73326657387c580a77678e48667 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 30 Sep 2016 14:51:18 -0700 Subject: [PATCH 3/3] moved toInt logic in map --- .../apache/spark/status/api/v1/ApplicationListResource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index aaa26af8d6e55..38b3cd75849a2 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -41,7 +41,7 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) { } } - val numApps = Option(limit).getOrElse(Integer.MAX_VALUE).asInstanceOf[Int] + val numApps = Option(limit).map(_.toInt).getOrElse(Integer.MAX_VALUE) val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED) val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING) allApps.filter { app => @@ -49,6 +49,7 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) { // if any attempt is still running, we consider the app to also still be running val statusOk = (!anyRunning && includeCompleted) || (anyRunning && includeRunning) + // keep the app if *any* attempts fall in the right time window val dateOk = app.attempts.exists { attempt => attempt.startTime.getTime >= minDate.timestamp &&