From 3d03d353ca1aa45308cccc198565c6343c4d5a2e Mon Sep 17 00:00:00 2001 From: Masayoshi TSUZUKI Date: Fri, 5 Dec 2014 07:47:27 +0900 Subject: [PATCH 1/2] [SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster Modified environment strings and path separators to platform-independent style if possible. --- .../apache/spark/deploy/yarn/ClientBase.scala | 21 ++++++--- .../spark/deploy/yarn/ExecutorRunnable.scala | 8 +++- .../deploy/yarn/YarnSparkHadoopUtil.scala | 43 ++++++++++++++++++- .../spark/deploy/yarn/ClientBaseSuite.scala | 17 ++++++-- .../yarn/YarnSparkHadoopUtilSuite.scala | 24 +++++++++++ 5 files changed, 101 insertions(+), 12 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index eb97a7b3c59a4..7d004ffd7fde3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -320,7 +320,10 @@ private[spark] trait ClientBase extends Logging { // Add Xmx for AM memory javaOpts += "-Xmx" + args.amMemory + "m" - val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + val tmpDir = new Path( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR + ) javaOpts += "-Djava.io.tmpdir=" + tmpDir // TODO: Remove once cpuset version is pushed out. @@ -411,7 +414,9 @@ private[spark] trait ClientBase extends Logging { "--num-executors ", args.numExecutors.toString) // Command for the ApplicationMaster - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ + val commands = prefixEnv ++ Seq( + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" + ) ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -690,7 +695,9 @@ private[spark] object ClientBase extends Logging { env: HashMap[String, String], extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) - addClasspathEntry(Environment.PWD.$(), env) + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env + ) // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { @@ -704,7 +711,9 @@ private[spark] object ClientBase extends Logging { } // Append all jar files under the working directory to the classpath. - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env) + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env + ) } /** @@ -759,7 +768,9 @@ private[spark] object ClientBase extends Logging { } } if (fileName != null) { - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env) + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env + ) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 6d9198c122e97..31ee3194d4916 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -142,7 +142,10 @@ class ExecutorRunnable( } javaOpts += "-Djava.io.tmpdir=" + - new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + new Path( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR + ) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend @@ -184,7 +187,8 @@ class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", + val commands = prefixEnv ++ Seq( + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server", // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index d7cf904db1c9e..2150069c9908a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -32,6 +32,8 @@ import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records.ApplicationAccessType +import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration @@ -112,7 +114,7 @@ object YarnSparkHadoopUtil { * If the map already contains this key, append the value to the existing value instead. */ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value + val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value env.put(key, newValue) } @@ -223,4 +225,43 @@ object YarnSparkHadoopUtil { ) } + /** + * Expand environment variable using Yarn API. + * If environment.$$() is implemented, return the result of it. + * Otherwise, return the result of environment.$() + * Note: $$() is added in Hadoop 2.4. + */ + def expandEnvironment(environment: Environment): String = { + var result = environment.$() + + // We use reflection in order not to fail building with Hadoop 2.3 or before. + val clazz = classOf[Environment] + val name = "$$" + if (clazz.getMethods().exists(_.getName == name)){ + val method = clazz.getMethod(name) + result = method.invoke(environment).asInstanceOf[String] + } + + result + } + + /** + * Get class path separator using Yarn API. + * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it. + * Otherwise, return File.pathSeparator + * Note: File.pathSeparator is added in Hadoop 2.4. + */ + def getClassPathSeparator(): String = { + var result = File.pathSeparator + + // We use reflection in order not to fail building with Hadoop 2.3 or before. + val clazz = classOf[ApplicationConstants] + val name = "CLASS_PATH_SEPARATOR" + if (clazz.getFields().exists(_.getName == name)){ + val field = clazz.getField(name) + result = field.get(null).asInstanceOf[String] + } + + result + } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 17b79ae1d82c4..c59cc84d44b62 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -23,6 +23,7 @@ import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig +import org.apache.hadoop.util.Shell import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -89,7 +90,7 @@ class ClientBaseSuite extends FunSuite with Matchers { ClientBase.populateClasspath(args, conf, sparkConf, env) - val cp = env("CLASSPATH").split(File.pathSeparator) + val cp = env("CLASSPATH").split(":|;|") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => val uri = new URI(entry) if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { @@ -98,8 +99,16 @@ class ClientBaseSuite extends FunSuite with Matchers { cp should not contain (uri.getPath()) } }) - cp should contain (Environment.PWD.$()) - cp should contain (s"${Environment.PWD.$()}${File.separator}*") + if (classOf[Environment].getMethods().exists(_.getName == "$$")) { + cp should contain("{{PWD}}") + cp should contain(s"{{PWD}}${Path.SEPARATOR}*") + } else if (Shell.WINDOWS) { + cp should contain("%PWD%") + cp should contain(s"%PWD%${Path.SEPARATOR}*") + } else { + cp should contain(Environment.PWD.$()) + cp should contain(s"${Environment.PWD.$()}${File.separator}*") + } cp should not contain (ClientBase.SPARK_JAR) cp should not contain (ClientBase.APP_JAR) } @@ -224,7 +233,7 @@ class ClientBaseSuite extends FunSuite with Matchers { def newEnv = MutableHashMap[String, String]() - def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;") + def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;|") def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 2cc5abb3a890c..39390391b99d6 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -20,7 +20,10 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.util.Shell import org.scalatest.{FunSuite, Matchers} import org.apache.hadoop.yarn.api.records.ApplicationAccessType @@ -148,4 +151,25 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { } } + + test("test expandEnvironment result") { + val target = Environment.PWD + var expect = "$" + target + if (classOf[Environment].getMethods().exists(_.getName == "$$")) { + expect = "{{" + target + "}}" + } else if (Shell.WINDOWS) { + expect = "%" + target + "%" + } + YarnSparkHadoopUtil.expandEnvironment(target) should be (expect) + } + + test("test getClassPathSeparator result") { + var expect = ":" + if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) { + expect = "" + } else if (Shell.WINDOWS) { + expect = ";" + } + YarnSparkHadoopUtil.getClassPathSeparator() should be (expect) + } } From ec4b865911e7719944e32519ac02509ce0d6b332 Mon Sep 17 00:00:00 2001 From: Masayoshi TSUZUKI Date: Thu, 29 Jan 2015 20:39:23 +0900 Subject: [PATCH 2/2] Rebased and modified as comments. --- .../deploy/yarn/YarnSparkHadoopUtil.scala | 34 ++++++------------- .../yarn/YarnSparkHadoopUtilSuite.scala | 23 +++++++------ 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 7aac203f2f630..01a0cabf3d0e5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -25,6 +25,7 @@ import java.util.regex.Pattern import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.HashMap +import scala.util.Try import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf @@ -194,19 +195,12 @@ object YarnSparkHadoopUtil { * Otherwise, return the result of environment.$() * Note: $$() is added in Hadoop 2.4. */ - def expandEnvironment(environment: Environment): String = { - var result = environment.$() - - // We use reflection in order not to fail building with Hadoop 2.3 or before. - val clazz = classOf[Environment] - val name = "$$" - if (clazz.getMethods().exists(_.getName == name)){ - val method = clazz.getMethod(name) - result = method.invoke(environment).asInstanceOf[String] - } + private lazy val expandMethod = + Try(classOf[Environment].getMethod("$$")) + .getOrElse(classOf[Environment].getMethod("$")) - result - } + def expandEnvironment(environment: Environment): String = + expandMethod.invoke(environment).asInstanceOf[String] /** * Get class path separator using Yarn API. @@ -214,17 +208,11 @@ object YarnSparkHadoopUtil { * Otherwise, return File.pathSeparator * Note: File.pathSeparator is added in Hadoop 2.4. */ - def getClassPathSeparator(): String = { - var result = File.pathSeparator - - // We use reflection in order not to fail building with Hadoop 2.3 or before. - val clazz = classOf[ApplicationConstants] - val name = "CLASS_PATH_SEPARATOR" - if (clazz.getFields().exists(_.getName == name)){ - val field = clazz.getField(name) - result = field.get(null).asInstanceOf[String] - } + private lazy val classPathSeparatorField = + Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR")) + .getOrElse(classOf[File].getField("pathSeparator")) - result + def getClassPathSeparator(): String = { + classPathSeparatorField.get(null).asInstanceOf[String] } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 39390391b99d6..b5a2db8f6225c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -23,12 +23,12 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.util.Shell import org.scalatest.{FunSuite, Matchers} import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.util.Utils class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { @@ -154,22 +154,23 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { test("test expandEnvironment result") { val target = Environment.PWD - var expect = "$" + target if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - expect = "{{" + target + "}}" - } else if (Shell.WINDOWS) { - expect = "%" + target + "%" + YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}") + } else if (Utils.isWindows) { + YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%") + } else { + YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target) } - YarnSparkHadoopUtil.expandEnvironment(target) should be (expect) + } test("test getClassPathSeparator result") { - var expect = ":" if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) { - expect = "" - } else if (Shell.WINDOWS) { - expect = ";" + YarnSparkHadoopUtil.getClassPathSeparator() should be ("") + } else if (Utils.isWindows) { + YarnSparkHadoopUtil.getClassPathSeparator() should be (";") + } else { + YarnSparkHadoopUtil.getClassPathSeparator() should be (":") } - YarnSparkHadoopUtil.getClassPathSeparator() should be (expect) } }