From 4180993fdfd5643e5c1a37820266fb6abd626c84 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 3 Sep 2014 17:23:31 +0900 Subject: [PATCH 01/11] Modified SparkContext to retain spark.unique.app.name property in SparkConf --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9ba21cfcde01a..b738f5661d75b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -186,6 +186,8 @@ class SparkContext(config: SparkConf) extends Logging { val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + val uniqueAppName = appName + "-" + System.currentTimeMillis() + conf.set("spark.unique.app.name", uniqueAppName) // Generate the random name for a temp folder in Tachyon // Add a timestamp as the suffix here to make it more safe From 55debab06be171c094ece4ebc5fd9e1bb76b6b00 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 3 Sep 2014 17:25:56 +0900 Subject: [PATCH 02/11] Modified SparkContext and Executor to set spark.executor.id to identifiers --- core/src/main/scala/org/apache/spark/SparkContext.scala | 1 + core/src/main/scala/org/apache/spark/executor/Executor.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b738f5661d75b..5c3b9919ecc00 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -202,6 +202,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val listenerBus = new LiveListenerBus // Create the Spark execution environment (cache, map output tracker, etc) + conf.set("spark.executor.id", "driver") private[spark] val env = SparkEnv.create( conf, "", diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1bb1b4aae91bb..0f7e34ecaea68 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -82,6 +82,7 @@ private[spark] class Executor( val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) + conf.set("spark.executor.id", "executor." + executorId) private val env = { if (!isLocal) { val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, From 71609f57ff14f71ef62be74e737416789605e618 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 3 Sep 2014 17:30:12 +0900 Subject: [PATCH 03/11] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource --- .../main/scala/org/apache/spark/executor/ExecutorSource.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSource.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerSource.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 0ed52cfe9df61..dc7e1a8e5468d 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -37,7 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String) val metricRegistry = new MetricRegistry() // TODO: It would be nice to pass the application name here - val sourceName = "executor.%s".format(executorId) + val sourceName = "executor" // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 5878e733908f5..84b2a6174407a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.metrics.source.Source private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "%s.DAGScheduler".format(sc.appName) + val sourceName = "DAGScheduler" metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failedStages.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 3f14c40ec61cb..9bd0d8f396388 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.metrics.source.Source private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "%s.BlockManager".format(sc.appName) + val sourceName = "BlockManager" metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { override def getValue: Long = { From 868e326f82a86001399842ba4dbfdb51203c0fd3 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 3 Sep 2014 17:33:26 +0900 Subject: [PATCH 04/11] Modified MetricsSystem to set registry name with unique application-id and driver/executor-id --- .../apache/spark/metrics/MetricsSystem.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 6ef817d0e587e..5b496d546b2bf 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -95,10 +95,24 @@ private[spark] class MetricsSystem private (val instance: String, sinks.foreach(_.report()) } + def buildRegistryName(source: Source) = { + val appName = conf.get("spark.unique.app.name") + val executorId = conf.get("spark.executor.id") + val registryName = { + if (appName != null && executorId != null) { + MetricRegistry.name(appName, executorId, source.sourceName) + } else { + MetricRegistry.name(source.sourceName) + } + } + registryName + } + def registerSource(source: Source) { sources += source try { - registry.register(source.sourceName, source.metricRegistry) + val regName = buildRegistryName(source) + registry.register(regName, source.metricRegistry) } catch { case e: IllegalArgumentException => logInfo("Metrics already registered", e) } @@ -106,8 +120,9 @@ private[spark] class MetricsSystem private (val instance: String, def removeSource(source: Source) { sources -= source + val regName = buildRegistryName(source) registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName) + def matches(name: String, metric: Metric): Boolean = name.startsWith(regName) }) } From 85ffc0244398b26365bcda751496a35bff3d3080 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 4 Sep 2014 00:12:22 +0900 Subject: [PATCH 05/11] Revert "Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource" This reverts commit 71609f57ff14f71ef62be74e737416789605e618. --- .../main/scala/org/apache/spark/executor/ExecutorSource.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSource.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerSource.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index dc7e1a8e5468d..0ed52cfe9df61 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -37,7 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String) val metricRegistry = new MetricRegistry() // TODO: It would be nice to pass the application name here - val sourceName = "executor" + val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 84b2a6174407a..5878e733908f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.metrics.source.Source private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "DAGScheduler" + val sourceName = "%s.DAGScheduler".format(sc.appName) metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failedStages.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 9bd0d8f396388..3f14c40ec61cb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.metrics.source.Source private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "BlockManager" + val sourceName = "%s.BlockManager".format(sc.appName) metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { override def getValue: Long = { From 6fc5560846bbd99ebcbc916bbc840cadb8db7849 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 4 Sep 2014 00:17:23 +0900 Subject: [PATCH 06/11] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource --- .../main/scala/org/apache/spark/executor/ExecutorSource.scala | 3 +-- .../scala/org/apache/spark/scheduler/DAGSchedulerSource.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerSource.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index d6721586566c2..c4d73622c4727 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String) override val metricRegistry = new MetricRegistry() - // TODO: It would be nice to pass the application name here - override val sourceName = "executor.%s".format(executorId) + override val sourceName = "executor" // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 94944399b134a..915d442506083 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.metrics.source.Source private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) extends Source { override val metricRegistry = new MetricRegistry() - override val sourceName = "%s.DAGScheduler".format(sc.appName) + override val sourceName = "DAGScheduler" metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failedStages.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 49fea6d9e2a76..8d322455d0dba 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.metrics.source.Source private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) extends Source { override val metricRegistry = new MetricRegistry() - override val sourceName = "%s.BlockManager".format(sc.appName) + override val sourceName = "BlockManager" metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { override def getValue: Long = { From 6f7dcd44bb07da4ee0686cf0b1994c97305ecee3 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 4 Sep 2014 01:01:45 +0900 Subject: [PATCH 07/11] Modified constructor of DAGSchedulerSource and BlockManagerSource because the instance of SparkContext is no longer used --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- .../scala/org/apache/spark/scheduler/DAGSchedulerSource.scala | 2 +- .../scala/org/apache/spark/storage/BlockManagerSource.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c761f9e3ca3fb..5eaf5be93a37b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -408,8 +408,8 @@ class SparkContext(config: SparkConf) extends Logging { // Post init taskScheduler.postStartHook() - private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) - private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) + private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) + private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 915d442506083..12668b6c0988e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -22,7 +22,7 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { override val metricRegistry = new MetricRegistry() override val sourceName = "DAGScheduler" diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 8d322455d0dba..8569c6f3cbbc3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -22,7 +22,7 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source -private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) +private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { override val metricRegistry = new MetricRegistry() override val sourceName = "BlockManager" From 15f88a390fc78181e88e030f47903c0576b664bd Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 4 Sep 2014 03:21:11 +0900 Subject: [PATCH 08/11] Modified MetricsSystem#buildRegistryName because conf.get does not return null when correspondin entry is absent --- .../scala/org/apache/spark/metrics/MetricsSystem.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 5b496d546b2bf..4cc8a3b4c4d55 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -96,11 +96,11 @@ private[spark] class MetricsSystem private (val instance: String, } def buildRegistryName(source: Source) = { - val appName = conf.get("spark.unique.app.name") - val executorId = conf.get("spark.executor.id") + val appNameOpt = conf.getOption("spark.unique.app.name") + val executorIdOpt = conf.getOption("spark.executor.id") val registryName = { - if (appName != null && executorId != null) { - MetricRegistry.name(appName, executorId, source.sourceName) + if (appNameOpt.isDefined && executorIdOpt.isDefined) { + MetricRegistry.name(appNameOpt.get, executorIdOpt.get, source.sourceName) } else { MetricRegistry.name(source.sourceName) } From e4a4593fd52da8a15770b8c3322f39b0f1363dbf Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 4 Sep 2014 15:04:33 +0900 Subject: [PATCH 09/11] tmp --- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../org/apache/spark/executor/Executor.scala | 2 +- .../apache/spark/metrics/MetricsSystem.scala | 4 ++ python/run-tests | 44 +++++++++---------- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1f54e2ac66750..431e4fb083857 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -187,7 +187,7 @@ class SparkContext(config: SparkConf) extends Logging { val master = conf.get("spark.master") val appName = conf.get("spark.app.name") val uniqueAppName = appName + "-" + System.currentTimeMillis() - conf.set("spark.unique.app.name", uniqueAppName) +// conf.set("spark.unique.app.name", uniqueAppName) // Generate the random name for a temp folder in Tachyon // Add a timestamp as the suffix here to make it more safe @@ -202,7 +202,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val listenerBus = new LiveListenerBus // Create the Spark execution environment (cache, map output tracker, etc) - conf.set("spark.executor.id", "driver") +// conf.set("spark.executor.id", "driver") private[spark] val env = SparkEnv.create( conf, "", diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 29724810b4275..958e818def0f3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -73,7 +73,7 @@ private[spark] class Executor( val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) - conf.set("spark.executor.id", "executor." + executorId) +// conf.set("spark.executor.id", "executor." + executorId) private val env = { if (!isLocal) { val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 4cc8a3b4c4d55..0402c1bcbd815 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -96,6 +96,8 @@ private[spark] class MetricsSystem private (val instance: String, } def buildRegistryName(source: Source) = { + new Exception().printStackTrace() +/* val appNameOpt = conf.getOption("spark.unique.app.name") val executorIdOpt = conf.getOption("spark.executor.id") val registryName = { @@ -106,6 +108,8 @@ private[spark] class MetricsSystem private (val instance: String, } } registryName + */ + "hoge" } def registerSource(source: Source) { diff --git a/python/run-tests b/python/run-tests index 7b1ee3e1cddba..2c9025ac72f56 100755 --- a/python/run-tests +++ b/python/run-tests @@ -49,37 +49,37 @@ function run_test() { echo "Running PySpark tests. Output is in python/unit-tests.log." # Try to test with Python 2.6, since that's the minimum version that we support: -if [ $(which python2.6) ]; then - export PYSPARK_PYTHON="python2.6" -fi +#if [ $(which python2.6) ]; then +# export PYSPARK_PYTHON="python2.6" +#fi echo "Testing with Python version:" $PYSPARK_PYTHON --version -run_test "pyspark/rdd.py" -run_test "pyspark/context.py" -run_test "pyspark/conf.py" -run_test "pyspark/sql.py" +#run_test "pyspark/rdd.py" +#run_test "pyspark/context.py" +#run_test "pyspark/conf.py" +#run_test "pyspark/sql.py" # These tests are included in the module-level docs, and so must # be handled on a higher level rather than within the python file. export PYSPARK_DOC_TEST=1 -run_test "pyspark/broadcast.py" -run_test "pyspark/accumulators.py" -run_test "pyspark/serializers.py" +#run_test "pyspark/broadcast.py" +#run_test "pyspark/accumulators.py" +#run_test "pyspark/serializers.py" unset PYSPARK_DOC_TEST -run_test "pyspark/shuffle.py" +#run_test "pyspark/shuffle.py" run_test "pyspark/tests.py" -run_test "pyspark/mllib/_common.py" -run_test "pyspark/mllib/classification.py" -run_test "pyspark/mllib/clustering.py" -run_test "pyspark/mllib/linalg.py" -run_test "pyspark/mllib/random.py" -run_test "pyspark/mllib/recommendation.py" -run_test "pyspark/mllib/regression.py" -run_test "pyspark/mllib/stat.py" -run_test "pyspark/mllib/tests.py" -run_test "pyspark/mllib/tree.py" -run_test "pyspark/mllib/util.py" +#run_test "pyspark/mllib/_common.py" +#run_test "pyspark/mllib/classification.py" +#run_test "pyspark/mllib/clustering.py" +#run_test "pyspark/mllib/linalg.py" +#run_test "pyspark/mllib/random.py" +#run_test "pyspark/mllib/recommendation.py" +#run_test "pyspark/mllib/regression.py" +#run_test "pyspark/mllib/stat.py" +#run_test "pyspark/mllib/tests.py" +#run_test "pyspark/mllib/tree.py" +#run_test "pyspark/mllib/util.py" if [[ $FAILED == 0 ]]; then echo -en "\033[32m" # Green From 08e627e12a04de57a9c7eaae9deaa4bbb97688a1 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 15 Sep 2014 14:04:45 +0900 Subject: [PATCH 10/11] Revert "tmp" This reverts commit e4a4593fd52da8a15770b8c3322f39b0f1363dbf. --- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../org/apache/spark/executor/Executor.scala | 2 +- .../apache/spark/metrics/MetricsSystem.scala | 4 -- python/run-tests | 44 +++++++++---------- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c70c5bc6a673a..9f34e103efcd5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -187,7 +187,7 @@ class SparkContext(config: SparkConf) extends Logging { val master = conf.get("spark.master") val appName = conf.get("spark.app.name") val uniqueAppName = appName + "-" + System.currentTimeMillis() -// conf.set("spark.unique.app.name", uniqueAppName) + conf.set("spark.unique.app.name", uniqueAppName) // Generate the random name for a temp folder in Tachyon // Add a timestamp as the suffix here to make it more safe @@ -202,7 +202,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val listenerBus = new LiveListenerBus // Create the Spark execution environment (cache, map output tracker, etc) -// conf.set("spark.executor.id", "driver") + conf.set("spark.executor.id", "driver") private[spark] val env = SparkEnv.create( conf, "", diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 96bde705d2e8f..d9c6a7eac8524 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -73,7 +73,7 @@ private[spark] class Executor( val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) -// conf.set("spark.executor.id", "executor." + executorId) + conf.set("spark.executor.id", "executor." + executorId) private val env = { if (!isLocal) { val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 0402c1bcbd815..4cc8a3b4c4d55 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -96,8 +96,6 @@ private[spark] class MetricsSystem private (val instance: String, } def buildRegistryName(source: Source) = { - new Exception().printStackTrace() -/* val appNameOpt = conf.getOption("spark.unique.app.name") val executorIdOpt = conf.getOption("spark.executor.id") val registryName = { @@ -108,8 +106,6 @@ private[spark] class MetricsSystem private (val instance: String, } } registryName - */ - "hoge" } def registerSource(source: Source) { diff --git a/python/run-tests b/python/run-tests index 5c19565a464e2..a67e5a99fbdcc 100755 --- a/python/run-tests +++ b/python/run-tests @@ -53,37 +53,37 @@ echo "Running PySpark tests. Output is in python/unit-tests.log." export PYSPARK_PYTHON="python" # Try to test with Python 2.6, since that's the minimum version that we support: -#if [ $(which python2.6) ]; then -# export PYSPARK_PYTHON="python2.6" -#fi +if [ $(which python2.6) ]; then + export PYSPARK_PYTHON="python2.6" +fi echo "Testing with Python version:" $PYSPARK_PYTHON --version -#run_test "pyspark/rdd.py" -#run_test "pyspark/context.py" -#run_test "pyspark/conf.py" -#run_test "pyspark/sql.py" +run_test "pyspark/rdd.py" +run_test "pyspark/context.py" +run_test "pyspark/conf.py" +run_test "pyspark/sql.py" # These tests are included in the module-level docs, and so must # be handled on a higher level rather than within the python file. export PYSPARK_DOC_TEST=1 -#run_test "pyspark/broadcast.py" -#run_test "pyspark/accumulators.py" -#run_test "pyspark/serializers.py" +run_test "pyspark/broadcast.py" +run_test "pyspark/accumulators.py" +run_test "pyspark/serializers.py" unset PYSPARK_DOC_TEST -#run_test "pyspark/shuffle.py" +run_test "pyspark/shuffle.py" run_test "pyspark/tests.py" -#run_test "pyspark/mllib/_common.py" -#run_test "pyspark/mllib/classification.py" -#run_test "pyspark/mllib/clustering.py" -#run_test "pyspark/mllib/linalg.py" -#run_test "pyspark/mllib/random.py" -#run_test "pyspark/mllib/recommendation.py" -#run_test "pyspark/mllib/regression.py" -#run_test "pyspark/mllib/stat.py" -#run_test "pyspark/mllib/tests.py" -#run_test "pyspark/mllib/tree.py" -#run_test "pyspark/mllib/util.py" +run_test "pyspark/mllib/_common.py" +run_test "pyspark/mllib/classification.py" +run_test "pyspark/mllib/clustering.py" +run_test "pyspark/mllib/linalg.py" +run_test "pyspark/mllib/random.py" +run_test "pyspark/mllib/recommendation.py" +run_test "pyspark/mllib/regression.py" +run_test "pyspark/mllib/stat.py" +run_test "pyspark/mllib/tests.py" +run_test "pyspark/mllib/tree.py" +run_test "pyspark/mllib/util.py" # Try to test with PyPy if [ $(which pypy) ]; then From cfe8027b2a47dd328d2f2f14ee6cb6a167d6a34d Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 18 Sep 2014 02:59:35 +0900 Subject: [PATCH 11/11] Use applicaton id for metrics name instead of System.currentTimeMillis when using YARN cluster mode --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 90894ad717b79..ac782f9d9d0ea 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -186,7 +186,11 @@ class SparkContext(config: SparkConf) extends Logging { val master = conf.get("spark.master") val appName = conf.get("spark.app.name") - val uniqueAppName = appName + "-" + System.currentTimeMillis() + + // TODO Get Application ID by common way for all master type as well as YARN cluster mode + val appId = conf.getOption("spark.yarn.app.id").getOrElse(System.currentTimeMillis().toString) + + val uniqueAppName = appId + "." + appName conf.set("spark.unique.app.name", uniqueAppName) // Generate the random name for a temp folder in Tachyon