From fd9da51251de69d1ae776eaad6d666ff7970692a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 22 Jul 2014 14:10:49 -0700 Subject: [PATCH 01/15] Formatting changes (minor) --- .../main/scala/org/apache/spark/deploy/Client.scala | 2 +- .../org/apache/spark/deploy/worker/CommandUtils.scala | 4 ++-- .../apache/spark/deploy/worker/ExecutorRunner.scala | 11 +++++++---- .../spark/executor/CoarseGrainedExecutorBackend.scala | 9 +++++++-- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index c371dc3a51c73..d45afee140564 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -51,7 +51,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. val env = Map[String, String]() - System.getenv().foreach{case (k, v) => env(k) = v} + System.getenv().foreach { case (k, v) => env(k) = v } val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 4af5bc3afad6c..63e61744218d8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -62,7 +62,7 @@ object CommandUtils extends Logging { val joined = command.libraryPathEntries.mkString(File.pathSeparator) Seq(s"-Djava.library.path=$joined") } else { - Seq() + Seq() } val permGenOpt = Seq("-XX:MaxPermSize=128m") @@ -71,7 +71,7 @@ object CommandUtils extends Logging { val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" val classPath = Utils.executeAndGetOutput( Seq(sparkHome + "/bin/compute-classpath" + ext), - extraEnvironment=command.environment) + extraEnvironment = command.environment) val userClassPath = command.classPathEntries ++ Seq(classPath) Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 467317dd9b44c..70cc712e8146c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -72,7 +72,7 @@ private[spark] class ExecutorRunner( } /** - * kill executor process, wait for exit and notify worker to update resource status + * Kill executor process, wait for exit and notify worker to update resource status. * * @param message the exception message which caused the executor's death */ @@ -114,9 +114,12 @@ private[spark] class ExecutorRunner( } def getCommandSeq = { - val command = Command(appDesc.command.mainClass, - appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment, - appDesc.command.classPathEntries, appDesc.command.libraryPathEntries, + val command = Command( + appDesc.command.mainClass, + appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), + appDesc.command.environment, + appDesc.command.classPathEntries, + appDesc.command.libraryPathEntries, appDesc.command.extraJavaOptions) CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b455c9fcf4bd6..860b47e056451 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -98,8 +98,13 @@ private[spark] class CoarseGrainedExecutorBackend( } private[spark] object CoarseGrainedExecutorBackend extends Logging { - def run(driverUrl: String, executorId: String, hostname: String, cores: Int, - workerUrl: Option[String]) { + + private def run( + driverUrl: String, + executorId: String, + hostname: String, + cores: Int, + workerUrl: Option[String]) { SignalLogger.register(log) From 855256edaacaaf3b1fafc5bcb2ba12243184c134 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 22 Jul 2014 15:41:42 -0700 Subject: [PATCH 02/15] Fix standalone-cluster mode The problem was that spark properties are not propagated to the driver. The solution is simple: pass the properties as part of the driver description, such that the command that launches the driver automatically sets the spark properties as its java system properties, which will then be loaded by SparkConf. --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 8 +++----- core/src/main/scala/org/apache/spark/deploy/Command.scala | 1 + .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 -- .../scala/org/apache/spark/deploy/client/TestClient.scala | 4 ++-- .../org/apache/spark/deploy/worker/CommandUtils.scala | 5 ++++- .../org/apache/spark/deploy/worker/DriverRunner.scala | 1 + .../org/apache/spark/deploy/worker/ExecutorRunner.scala | 1 + .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 5 ++--- .../scala/org/apache/spark/deploy/JsonProtocolSuite.scala | 4 ++-- .../org/apache/spark/deploy/worker/DriverRunnerTest.scala | 2 +- .../apache/spark/deploy/worker/ExecutorRunnerTest.scala | 2 +- 11 files changed, 18 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index d45afee140564..bdf7c3a7661b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy -import scala.collection.JavaConversions._ -import scala.collection.mutable.Map import scala.concurrent._ import akka.actor._ @@ -50,8 +48,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. - val env = Map[String, String]() - System.getenv().foreach { case (k, v) => env(k) = v } + val env = sys.env + val props = conf.getAll.toMap val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" @@ -68,7 +66,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends val javaOptionsConf = "spark.driver.extraJavaOptions" val javaOpts = sys.props.get(javaOptionsConf) val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ - driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts) + driverArgs.driverOptions, env, props, classPathEntries, libraryPathEntries, javaOpts) val driverDescription = new DriverDescription( driverArgs.jarUrl, diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala index 32f3ba385084f..c36c496a90a02 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Command.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala @@ -23,6 +23,7 @@ private[spark] case class Command( mainClass: String, arguments: Seq[String], environment: Map[String, String], + sparkProps: Map[String, String], classPathEntries: Seq[String], libraryPathEntries: Seq[String], extraJavaOptions: Option[String] = None) { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3d8373d8175ee..76ac3a5ab92dc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -132,8 +132,6 @@ object SparkSubmit { (clusterManager, deployMode) match { case (MESOS, CLUSTER) => printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") - case (STANDALONE, CLUSTER) => - printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.") case (_, CLUSTER) if args.isPython => printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") case (_, CLUSTER) if isShell(args.primaryResource) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index e15a87bd38fda..b929746e4ad7d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,8 +49,8 @@ private[spark] object TestClient { val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription( - "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), - Seq()), Some("dummy-spark-home"), "ignored") + "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Map(), + Seq(), Seq()), Some("dummy-spark-home"), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 63e61744218d8..7d12d1eabd3cd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -67,6 +67,9 @@ object CommandUtils extends Logging { val permGenOpt = Seq("-XX:MaxPermSize=128m") + // Convert Spark properties to java system properties + val sparkOpts = command.sparkProps.map { case (k, v) => s"-D$k=$v" } + // Figure out our classpath with the external compute-classpath script val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" val classPath = Utils.executeAndGetOutput( @@ -75,7 +78,7 @@ object CommandUtils extends Logging { val userClassPath = command.classPathEntries ++ Seq(classPath) Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++ - permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts + sparkOpts ++ permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts } /** Spawn a thread that will redirect a given stream to a file */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 662d37871e7a6..d6fe1100b806a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -79,6 +79,7 @@ private[spark] class DriverRunner( driverDesc.command.mainClass, driverDesc.command.arguments.map(substituteVariables), driverDesc.command.environment, + driverDesc.command.sparkProps, classPath, driverDesc.command.libraryPathEntries, driverDesc.command.extraJavaOptions) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 70cc712e8146c..0ebf718430e55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -118,6 +118,7 @@ private[spark] class ExecutorRunner( appDesc.command.mainClass, appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment, + appDesc.command.sparkProps, appDesc.command.classPathEntries, appDesc.command.libraryPathEntries, appDesc.command.extraJavaOptions) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index bf2dc88e29048..519f4e1a2c9d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -54,9 +54,8 @@ private[spark] class SparkDeploySchedulerBackend( cp.split(java.io.File.pathSeparator) } - val command = Command( - "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, - classPathEntries, libraryPathEntries, extraJavaOpts) + val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", + args, sc.executorEnvs, conf.getAll.toMap, classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 01ab2d549325c..dec076c358cd4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite { } def createAppDesc(): ApplicationDescription = { - val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq()) + val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Map(), Seq(), Seq()) new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") } @@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite { def createDriverCommand() = new Command( "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"), - Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo") + Map(("K1", "V1"), ("K2", "V2")), Map(), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo") ) def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3, diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index 4633bc3f7f25e..9f1e98dcbb2fb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription} class DriverRunnerTest extends FunSuite { private def createDriverRunner() = { - val command = new Command("mainClass", Seq(), Map(), Seq(), Seq()) + val command = new Command("mainClass", Seq(), Map(), Map(), Seq(), Seq()) val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription, null, "akka://1.2.3.4/worker/") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index e5f748d55500d..66364442ce99d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite { def f(s:String) = new File(s) val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(), Map(), Seq(), Seq()), + Command("foo", Seq(), Map(), Map(), Seq(), Seq()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), From 7f854bc0cf83e04cce9314913547b21dbd203da1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 22 Jul 2014 18:51:53 -0700 Subject: [PATCH 03/15] Fix test --- .../test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index dec076c358cd4..6b14bfadf8704 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -170,7 +170,7 @@ object JsonConstants { """ |{"name":"name","cores":4,"memoryperslave":1234, |"user":"%s","sparkhome":"sparkHome", - |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"} + |"command":"Command(mainClass,List(arg1, arg2),Map(),Map(),List(),List(),None)"} """.format(System.getProperty("user.name", "")).stripMargin val executorRunnerJsonStr = From 6ceb14fb6abb28ca080eb95e936006c0d7746ba7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 22 Jul 2014 19:03:57 -0700 Subject: [PATCH 04/15] Allow relevant configs to propagate to standalone Driver --- .../main/scala/org/apache/spark/deploy/Client.scala | 1 + .../scala/org/apache/spark/deploy/SparkSubmit.scala | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index bdf7c3a7661b5..1d29afef17197 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -107,6 +107,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends // Exception, if present statusResponse.exception.map { e => println(s"Exception from cluster was: $e") + e.printStackTrace() System.exit(-1) } System.exit(0) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 76ac3a5ab92dc..11aec9ffa1d70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -164,9 +164,9 @@ object SparkSubmit { val options = List[OptionAssigner]( // All cluster managers - OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"), - OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), + OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"), + OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), + OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"), // Standalone cluster only OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), @@ -197,9 +197,9 @@ object SparkSubmit { sysProp = "spark.driver.extraJavaOptions"), OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraLibraryPath"), - OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT, + OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), - OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT, + OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.files") From d7e27285ab961a2be71504c41d14c397def6817f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 22 Jul 2014 19:06:15 -0700 Subject: [PATCH 05/15] Avoid deprecation warning in standalone Client --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 1d29afef17197..6dfd57f30c454 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -140,8 +140,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends */ object Client { def main(args: Array[String]) { - println("WARNING: This client is deprecated and will be removed in a future version of Spark.") - println("Use ./bin/spark-submit with \"--master spark://host:port\"") + if (!sys.props.contains("SPARK_SUBMIT")) { + println("WARNING: This client is deprecated and will be removed in a future version of Spark.") + println("Use ./bin/spark-submit with \"--master spark://host:port\"") + } val conf = new SparkConf() val driverArgs = new ClientArguments(args) From c141a007bcdaa469a3394baa2571677231bf2056 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 22 Jul 2014 19:26:56 -0700 Subject: [PATCH 06/15] Don't display "unknown app" on driver log pages --- .../org/apache/spark/deploy/worker/DriverRunner.scala | 1 + .../apache/spark/deploy/worker/ExecutorRunner.scala | 1 + .../org/apache/spark/deploy/worker/ui/LogPage.scala | 11 +++++------ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index d6fe1100b806a..0299b3e44d554 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState /** * Manages the execution of one driver, including automatically restarting the driver on failure. + * This is currently only used in standalone cluster deploy mode. */ private[spark] class DriverRunner( val driverId: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 0ebf718430e55..5e15e3b1b0691 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender /** * Manages the execution of one executor process. + * This is currently only used in standalone mode. */ private[spark] class ExecutorRunner( val appId: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index b389cb546de6c..ecb358c399819 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.worker.ui -import java.io.File import javax.servlet.http.HttpServletRequest import scala.xml.Node @@ -25,7 +24,7 @@ import scala.xml.Node import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils import org.apache.spark.Logging -import org.apache.spark.util.logging.{FileAppender, RollingFileAppender} +import org.apache.spark.util.logging.RollingFileAppender private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging { private val worker = parent.worker @@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val (logDir, params) = (appId, executorId, driverId) match { + val (logDir, params, pageName) = (appId, executorId, driverId) match { case (Some(a), Some(e), None) => - (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e") + (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e") case (None, None, Some(d)) => - (s"${workDir.getPath}/$d/", s"driverId=$d") + (s"${workDir.getPath}/$d/", s"driverId=$d", d) case _ => throw new Exception("Request must specify either application or driver identifiers") } @@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w - UIUtils.basicSparkPage(content, logType + " log page for " + appId.getOrElse("unknown app")) + UIUtils.basicSparkPage(content, logType + " log page for " + pageName) } /** Get the part of the log files given the offset and desired length of bytes */ From 5a9c6c738b6956543c8d166a8838115a70c841b1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 23 Jul 2014 12:22:07 -0700 Subject: [PATCH 07/15] Fix line too long --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 6dfd57f30c454..f5cb23dc8894f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -141,7 +141,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends object Client { def main(args: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { - println("WARNING: This client is deprecated and will be removed in a future version of Spark.") + println("WARNING: This client is deprecated and will be removed in a future version of Spark") println("Use ./bin/spark-submit with \"--master spark://host:port\"") } From 78752f80070d17d1044def220e6f857519a5114e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 23 Jul 2014 16:04:24 -0700 Subject: [PATCH 08/15] Fix tests --- .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 565c53e9529ff..21c0f3c596a11 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -195,7 +195,9 @@ class SparkSubmitSuite extends FunSuite with Matchers { childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2") mainClass should be ("org.apache.spark.deploy.Client") classpath should have size (0) - sysProps should have size (2) + sysProps should have size (4) + sysProps.keys should contain ("spark.master") + sysProps.keys should contain ("spark.app.name") sysProps.keys should contain ("spark.jars") sysProps.keys should contain ("SPARK_SUBMIT") } From 79f63a3884a5c9479cf4c15abd0a4950743da16f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Jul 2014 16:45:13 -0700 Subject: [PATCH 09/15] Move sparkProps into javaOpts --- .../main/scala/org/apache/spark/deploy/Client.scala | 11 +++++------ .../main/scala/org/apache/spark/deploy/Command.scala | 3 +-- .../org/apache/spark/deploy/client/TestClient.scala | 6 +++--- .../org/apache/spark/deploy/worker/CommandUtils.scala | 6 +----- .../org/apache/spark/deploy/worker/DriverRunner.scala | 3 +-- .../apache/spark/deploy/worker/ExecutorRunner.scala | 3 +-- .../cluster/SparkDeploySchedulerBackend.scala | 9 +++++++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 +++++++++ .../org/apache/spark/deploy/JsonProtocolSuite.scala | 4 ++-- .../apache/spark/deploy/worker/DriverRunnerTest.scala | 2 +- .../spark/deploy/worker/ExecutorRunnerTest.scala | 2 +- 11 files changed, 32 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index f5cb23dc8894f..cec652de4a200 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -48,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. - val env = sys.env - val props = conf.getAll.toMap - val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" @@ -63,10 +60,12 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends cp.split(java.io.File.pathSeparator) } - val javaOptionsConf = "spark.driver.extraJavaOptions" - val javaOpts = sys.props.get(javaOptionsConf) + val extraJavaOptsConf = "spark.driver.extraJavaOptions" + val extraJavaOpts = sys.props.get(extraJavaOptsConf).toSeq + val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ - driverArgs.driverOptions, env, props, classPathEntries, libraryPathEntries, javaOpts) + driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) val driverDescription = new DriverDescription( driverArgs.jarUrl, diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala index c36c496a90a02..a2b263544c6a2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Command.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala @@ -23,8 +23,7 @@ private[spark] case class Command( mainClass: String, arguments: Seq[String], environment: Map[String, String], - sparkProps: Map[String, String], classPathEntries: Seq[String], libraryPathEntries: Seq[String], - extraJavaOptions: Option[String] = None) { + javaOpts: Seq[String]) { } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index b929746e4ad7d..b8ffa9afb69cb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -46,11 +46,11 @@ private[spark] object TestClient { def main(args: Array[String]) { val url = args(0) val conf = new SparkConf - val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, + val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription( - "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Map(), - Seq(), Seq()), Some("dummy-spark-home"), "ignored") + "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), + Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 7d12d1eabd3cd..687e492a0d6fc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -47,7 +47,6 @@ object CommandUtils extends Logging { */ def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = { val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M") - val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq()) // Exists for backwards compatibility with older Spark versions val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString) @@ -67,9 +66,6 @@ object CommandUtils extends Logging { val permGenOpt = Seq("-XX:MaxPermSize=128m") - // Convert Spark properties to java system properties - val sparkOpts = command.sparkProps.map { case (k, v) => s"-D$k=$v" } - // Figure out our classpath with the external compute-classpath script val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" val classPath = Utils.executeAndGetOutput( @@ -78,7 +74,7 @@ object CommandUtils extends Logging { val userClassPath = command.classPathEntries ++ Seq(classPath) Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++ - sparkOpts ++ permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts + permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts } /** Spawn a thread that will redirect a given stream to a file */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 0299b3e44d554..5caaf6bea3575 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -80,10 +80,9 @@ private[spark] class DriverRunner( driverDesc.command.mainClass, driverDesc.command.arguments.map(substituteVariables), driverDesc.command.environment, - driverDesc.command.sparkProps, classPath, driverDesc.command.libraryPathEntries, - driverDesc.command.extraJavaOptions) + driverDesc.command.javaOpts) val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, sparkHome.getAbsolutePath) launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 5e15e3b1b0691..7be89f9aff0f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -119,10 +119,9 @@ private[spark] class ExecutorRunner( appDesc.command.mainClass, appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment, - appDesc.command.sparkProps, appDesc.command.classPathEntries, appDesc.command.libraryPathEntries, - appDesc.command.extraJavaOptions) + appDesc.command.javaOpts) CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 519f4e1a2c9d1..d3768ad0e7068 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -45,7 +45,7 @@ private[spark] class SparkDeploySchedulerBackend( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") + val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").toSeq val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } @@ -54,8 +54,13 @@ private[spark] class SparkDeploySchedulerBackend( cp.split(java.io.File.pathSeparator) } + // Start executors with a few necessary configs for registering with the scheduler + val sparkJavaOpts = Utils.sparkJavaOpts(conf, (key: String) => + key.startsWith("spark.akka") || key.startsWith("spark.auth") + ) + val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", - args, sc.executorEnvs, conf.getAll.toMap, classPathEntries, libraryPathEntries, extraJavaOpts) + args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5784e974fbb67..4f030c0a90bd5 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1306,4 +1306,13 @@ private[spark] object Utils extends Logging { s"$className: $desc\n$st" } + /** + * Convert all spark properties set in the given SparkConf to a sequence of java options. + */ + def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = { + conf.getAll + .filter { case (k, _) => filterKey(k) } + .map { case (k, v) => "-D" + k + "=\\\"" + v + "\\\"" } + } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 6b14bfadf8704..a484acd971797 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite { } def createAppDesc(): ApplicationDescription = { - val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Map(), Seq(), Seq()) + val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq()) new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") } @@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite { def createDriverCommand() = new Command( "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"), - Map(("K1", "V1"), ("K2", "V2")), Map(), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo") + Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo") ) def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3, diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index 9f1e98dcbb2fb..c930839b47f11 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription} class DriverRunnerTest extends FunSuite { private def createDriverRunner() = { - val command = new Command("mainClass", Seq(), Map(), Map(), Seq(), Seq()) + val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq()) val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription, null, "akka://1.2.3.4/worker/") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 66364442ce99d..ca4d987619c91 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite { def f(s:String) = new File(s) val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(), Map(), Map(), Seq(), Seq()), + Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), From b890949f8521114956bcd081153a08eaaa6bf854 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Jul 2014 17:40:10 -0700 Subject: [PATCH 10/15] Abstract usages of converting spark opts to java opts --- .../scala/org/apache/spark/SparkConf.scala | 23 ++++++++++++++++++- .../cluster/SparkDeploySchedulerBackend.scala | 6 ++--- .../apache/spark/deploy/yarn/ClientBase.scala | 6 ++--- .../deploy/yarn/ExecutorRunnableUtil.scala | 8 ++----- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ce4b91cae8ae..a6efa125b6629 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap */ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { + import SparkConf._ + /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) @@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * * E.g. spark.akka.option.x.y.x = "value" */ - getAll.filter {case (k, v) => k.startsWith("akka.")} + getAll.filter { case (k, _) => isAkkaConf(k) } /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.contains(key) @@ -292,3 +294,22 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } } + +private[spark] object SparkConf { + /** + * Return whether the given config is an akka config (e.g. akka.actor.provider). + * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout). + */ + def isAkkaConf(name: String): Boolean = name.startsWith("akka.") + + /** + * Return whether the given config should be passed to an executor on start-up. + * + * When connecting to the scheduler, the executor backend needs certain akka and authentication + * settings to connect to the scheduler, while the rest of the spark configs can be inherited + * from the driver later. + */ + def isExecutorStartupConf(name: String): Boolean = { + isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth") + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index d3768ad0e7068..1da76f58def3f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster -import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} @@ -55,9 +55,7 @@ private[spark] class SparkDeploySchedulerBackend( } // Start executors with a few necessary configs for registering with the scheduler - val sparkJavaOpts = Utils.sparkJavaOpts(conf, (key: String) => - key.startsWith("spark.akka") || key.startsWith("spark.auth") - ) + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 556f49342977a..4ac5ff5231d02 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -37,7 +37,9 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records + import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext} +import org.apache.spark.util.Utils /** * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The @@ -383,9 +385,7 @@ trait ClientBase extends Logging { // Forward the Spark configuration to the application master / executors. // TODO: it might be nicer to pass these as an internal environment variable rather than // as Java options, due to complications with string parsing of nested quotes. - for ((k, v) <- sparkConf.getAll) { - javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" - } + javaOpts ++= Utils.sparkJavaOpts(sparkConf) if (args.amClass == classOf[ApplicationMaster].getName) { sparkConf.getOption("spark.driver.extraJavaOptions") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 4ba7133a959ed..eb22eeeb0098e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.util.Utils trait ExecutorRunnableUtil extends Logging { @@ -66,12 +67,7 @@ trait ExecutorRunnableUtil extends Logging { // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. - sparkConf.getAll. - filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. - foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } - - sparkConf.getAkkaConf. - foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } + javaOpts ++= Utils.sparkJavaOpts(sparkConf, SparkConf.isExecutorStartupConf) // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. From ed0149109064581f1ea029437d6974c08fa27802 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Jul 2014 18:49:00 -0700 Subject: [PATCH 11/15] Don't go overboard with escaping --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4f030c0a90bd5..812afd260146e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1312,7 +1312,7 @@ private[spark] object Utils extends Logging { def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = { conf.getAll .filter { case (k, _) => filterKey(k) } - .map { case (k, v) => "-D" + k + "=\\\"" + v + "\\\"" } + .map { case (k, v) => s"-D$k=$v" } } } From 2f2908be16d5284cd99886152f0677e9af128a57 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Jul 2014 21:55:19 -0700 Subject: [PATCH 12/15] Fix tests --- .../test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index a484acd971797..093394ad6d142 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -170,7 +170,7 @@ object JsonConstants { """ |{"name":"name","cores":4,"memoryperslave":1234, |"user":"%s","sparkhome":"sparkHome", - |"command":"Command(mainClass,List(arg1, arg2),Map(),Map(),List(),List(),None)"} + |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"} """.format(System.getProperty("user.name", "")).stripMargin val executorRunnerJsonStr = From 6f64a9bac938b9c47b3408d7fdc34ce8048a61d0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 28 Jul 2014 15:11:04 -0700 Subject: [PATCH 13/15] Revert changes in YARN There is currently no good way to handle quoted arguments and backslashes in YARN. The new code does not do any escaping, which is fine for standalone mode (which uses Java's ProcessBuilder) but not for YARN mode. I will open a separate JIRA for this. --- .../scala/org/apache/spark/deploy/yarn/ClientBase.scala | 5 ++++- .../apache/spark/deploy/yarn/ExecutorRunnableUtil.scala | 8 +++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index cb4cc7b119066..d6c96554c040f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -386,7 +386,10 @@ trait ClientBase extends Logging { // Forward the Spark configuration to the application master / executors. // TODO: it might be nicer to pass these as an internal environment variable rather than // as Java options, due to complications with string parsing of nested quotes. - javaOpts ++= Utils.sparkJavaOpts(sparkConf) + // TODO: Use Utils.sparkJavaOpts here once we figure out how to deal with quotes and backslashes + for ((k, v) <- sparkConf.getAll) { + javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" + } if (args.amClass == classOf[ApplicationMaster].getName) { sparkConf.getOption("spark.driver.extraJavaOptions") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index eb22eeeb0098e..d4769d34e7440 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -67,7 +67,13 @@ trait ExecutorRunnableUtil extends Logging { // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. - javaOpts ++= Utils.sparkJavaOpts(sparkConf, SparkConf.isExecutorStartupConf) + // TODO: Use Utils.sparkJavaOpts here once we figure out how to deal with quotes and backslashes + sparkConf.getAll. + filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. + foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } + + sparkConf.getAkkaConf. + foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. From 2678d13884d69c9c62d5f3dd532bc0ad116abbae Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 28 Jul 2014 15:54:07 -0700 Subject: [PATCH 14/15] Handle extraJavaOpts properly --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 3 ++- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index cec652de4a200..17c507af2652d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -61,7 +61,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends } val extraJavaOptsConf = "spark.driver.extraJavaOptions" - val extraJavaOpts = sys.props.get(extraJavaOptsConf).toSeq + val extraJavaOpts = sys.props.get(extraJavaOptsConf) + .map(Utils.splitCommandString).getOrElse(Seq.empty) val sparkJavaOpts = Utils.sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 1da76f58def3f..48aaaa54bdb35 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -45,7 +45,8 @@ private[spark] class SparkDeploySchedulerBackend( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").toSeq + val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") + .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } From 8c11a0d36a19c2f594d6e58084ac499ecac2b0c4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 28 Jul 2014 17:54:39 -0700 Subject: [PATCH 15/15] Clean up imports / comments (minor) --- core/src/main/scala/org/apache/spark/SparkConf.scala | 5 ++--- .../main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 3 --- .../org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala | 2 -- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index a6efa125b6629..38700847c80f4 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -305,9 +305,8 @@ private[spark] object SparkConf { /** * Return whether the given config should be passed to an executor on start-up. * - * When connecting to the scheduler, the executor backend needs certain akka and authentication - * settings to connect to the scheduler, while the rest of the spark configs can be inherited - * from the driver later. + * Certain akka and authentication configs are required of the executor when it connects to + * the scheduler, while the rest of the spark configs can be inherited from the driver later. */ def isExecutorStartupConf(name: String): Boolean = { isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index d6c96554c040f..a1298e8f30b5c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -37,9 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records - import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext} -import org.apache.spark.util.Utils /** * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The @@ -386,7 +384,6 @@ trait ClientBase extends Logging { // Forward the Spark configuration to the application master / executors. // TODO: it might be nicer to pass these as an internal environment variable rather than // as Java options, due to complications with string parsing of nested quotes. - // TODO: Use Utils.sparkJavaOpts here once we figure out how to deal with quotes and backslashes for ((k, v) <- sparkConf.getAll) { javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index d4769d34e7440..4ba7133a959ed 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.util.Utils trait ExecutorRunnableUtil extends Logging { @@ -67,7 +66,6 @@ trait ExecutorRunnableUtil extends Logging { // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses Akka to connect to the scheduler, the akka settings are needed as well as the // authentication settings. - // TODO: Use Utils.sparkJavaOpts here once we figure out how to deal with quotes and backslashes sparkConf.getAll. filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }