From 8b3b8e8f830d1148e43f050b6b0011817ea1262e Mon Sep 17 00:00:00 2001 From: Wisely Chen Date: Fri, 26 Jun 2015 18:21:06 +0800 Subject: [PATCH 1/4] [SPARK-8656] Fix the webUI and JSON API number is not synced --- .../scala/org/apache/spark/deploy/JsonProtocol.scala | 10 +++++----- .../org/apache/spark/deploy/master/WorkerInfo.scala | 4 ++++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 2954f932b4f41..95c11ffe8d057 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -77,11 +77,11 @@ private[deploy] object JsonProtocol { def writeMasterState(obj: MasterStateResponse): JObject = { ("url" -> obj.uri) ~ - ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ - ("cores" -> obj.workers.map(_.cores).sum) ~ - ("coresused" -> obj.workers.map(_.coresUsed).sum) ~ - ("memory" -> obj.workers.map(_.memory).sum) ~ - ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~ + ("workers" -> obj.workers.filter(_.isAlive()).toList.map(writeWorkerInfo)) ~ + ("cores" -> obj.workers.filter(_.isAlive()).map(_.cores).sum) ~ + ("coresused" -> obj.workers.filter(_.isAlive()).map(_.coresUsed).sum) ~ + ("memory" -> obj.workers.filter(_.isAlive()).map(_.memory).sum) ~ + ("memoryused" -> obj.workers.filter(_.isAlive()).map(_.memoryUsed).sum) ~ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 9b3d48c6edc84..4347c7c7bc60e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -107,4 +107,8 @@ private[spark] class WorkerInfo( def setState(state: WorkerState.Value): Unit = { this.state = state } + + def isAlive(): Boolean={ + this.state == WorkerState.ALIVE + } } From 431d2b04142d84dfd605c5f687fa0c4ddc39485e Mon Sep 17 00:00:00 2001 From: Wisely Chen Date: Sat, 27 Jun 2015 05:32:26 +0800 Subject: [PATCH 2/4] Worker List should contain DEAD node also --- core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 95c11ffe8d057..de513379dea9a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -77,7 +77,7 @@ private[deploy] object JsonProtocol { def writeMasterState(obj: MasterStateResponse): JObject = { ("url" -> obj.uri) ~ - ("workers" -> obj.workers.filter(_.isAlive()).toList.map(writeWorkerInfo)) ~ + ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ ("cores" -> obj.workers.filter(_.isAlive()).map(_.cores).sum) ~ ("coresused" -> obj.workers.filter(_.isAlive()).map(_.coresUsed).sum) ~ ("memory" -> obj.workers.filter(_.isAlive()).map(_.memory).sum) ~ From 2c8ea89b00e95b00b82092fda9d514336e96bac1 Mon Sep 17 00:00:00 2001 From: Wisely Chen Date: Fri, 3 Jul 2015 10:34:07 +0800 Subject: [PATCH 3/4] Change some styling and add local variable --- .../scala/org/apache/spark/deploy/JsonProtocol.scala | 9 +++++---- .../org/apache/spark/deploy/master/WorkerInfo.scala | 4 +--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index de513379dea9a..be595314f05f6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -76,12 +76,13 @@ private[deploy] object JsonProtocol { } def writeMasterState(obj: MasterStateResponse): JObject = { + val alive_workers = obj.workers.filter(_.isAlive()) ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ - ("cores" -> obj.workers.filter(_.isAlive()).map(_.cores).sum) ~ - ("coresused" -> obj.workers.filter(_.isAlive()).map(_.coresUsed).sum) ~ - ("memory" -> obj.workers.filter(_.isAlive()).map(_.memory).sum) ~ - ("memoryused" -> obj.workers.filter(_.isAlive()).map(_.memoryUsed).sum) ~ + ("cores" -> alive_workers.map(_.cores).sum) ~ + ("coresused" -> alive_workers.map(_.coresUsed).sum) ~ + ("memory" -> alive_workers.map(_.memory).sum) ~ + ("memoryused" -> alive_workers.map(_.memoryUsed).sum) ~ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 4347c7c7bc60e..7d6c0c7ac8154 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -108,7 +108,5 @@ private[spark] class WorkerInfo( this.state = state } - def isAlive(): Boolean={ - this.state == WorkerState.ALIVE - } + def isAlive(): Boolean = this.state == WorkerState.ALIVE } From 9e54bf09c75594941ff46a767af4d66d285fb63f Mon Sep 17 00:00:00 2001 From: Wisely Chen Date: Sat, 4 Jul 2015 05:47:39 +0800 Subject: [PATCH 4/4] Change variable name to camel case --- .../scala/org/apache/spark/deploy/JsonProtocol.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index be595314f05f6..ccffb36652988 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -76,13 +76,13 @@ private[deploy] object JsonProtocol { } def writeMasterState(obj: MasterStateResponse): JObject = { - val alive_workers = obj.workers.filter(_.isAlive()) + val aliveWorkers = obj.workers.filter(_.isAlive()) ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ - ("cores" -> alive_workers.map(_.cores).sum) ~ - ("coresused" -> alive_workers.map(_.coresUsed).sum) ~ - ("memory" -> alive_workers.map(_.memory).sum) ~ - ("memoryused" -> alive_workers.map(_.memoryUsed).sum) ~ + ("cores" -> aliveWorkers.map(_.cores).sum) ~ + ("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~ + ("memory" -> aliveWorkers.map(_.memory).sum) ~ + ("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~