Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Properties, Try}

import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -387,20 +386,40 @@ private[spark] class SparkSubmit extends Logging {
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
// Executors will get the jars from the Spark file server.
// Explicitly download the related files here
args.jars = renameResourcesToLocalFS(args.jars, localJars)
args.jars = localJars
val filesLocalFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
val archiveLocalFiles = Option(args.archives).map { uri =>
val resolvedUri = Utils.resolveURI(uri)
val downloadedUri = downloadFileList(
UriBuilder.fromUri(resolvedUri).fragment(null).build().toString,
val archiveLocalFiles = Option(args.archives).map { uris =>
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val localArchives = downloadFileList(
resolvedUris.map(
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf, secMgr)
UriBuilder.fromUri(downloadedUri).fragment(resolvedUri.getFragment).build().toString

// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
// in cluster mode, the archives should be available in the driver's current working
// directory too.
Comment thread
HyukjinKwon marked this conversation as resolved.
Outdated
Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map {
case (localArchive, resolvedUri) =>
val source = new File(localArchive.getPath)
val dest = new File(
".",
if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
logInfo(
s"Unpacking an archive $resolvedUri " +
s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)

// Keep the URIs of local files with the given fragments.
UriBuilder.fromUri(
localArchive).fragment(resolvedUri.getFragment).build().toString
}.mkString(",")
}.orNull
args.files = renameResourcesToLocalFS(args.files, filesLocalFiles)
args.archives = renameResourcesToLocalFS(args.archives, archiveLocalFiles)
args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles)
args.files = filesLocalFiles
args.archives = archiveLocalFiles
args.pyFiles = localPyFiles
}
}

Expand Down Expand Up @@ -836,21 +855,6 @@ private[spark] class SparkSubmit extends Logging {
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}

private def renameResourcesToLocalFS(resources: String, localResources: String): String = {

@HyukjinKwon HyukjinKwon Dec 11, 2020

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the codes multiple times for sure, and I think this code does a duplicated job.
If I am not horribly wrong somewhere:

  • localResources itself are always local files, and resources will always be replaced to localResources.
  • If resources is null, then localResources will be null too

cc @skonto from 5e74570

if (resources != null && localResources != null) {
val localResourcesSeq = Utils.stringToSeq(localResources)
Utils.stringToSeq(resources).map { resource =>
val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
localResourcesSeq.find { localUri =>
val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
filenameRemote == filenameLocal
}.getOrElse(resource)
}.mkString(",")
} else {
resources
}
}

// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
Expand Down
5 changes: 4 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,10 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
<td><code>"3"</code></td>
<td>
This sets the major Python version of the docker image used to run the driver and executor containers. Can be 3.
This sets the major Python version of the docker image used to run the driver and executor containers.
It can be only "3". This configuration was deprecated from Spark 3.1.0, and is effectively no-op.
Users should set 'spark.pyspark.python' and 'spark.pyspark.driver.python' configurations or
'PYSPARK_PYTHON' and 'PYSPARK_DRIVER_PYTHON' environment variables.
</td>
<td>2.4.0</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.internal.config.ConfigBuilder

private[spark] object Config extends Logging {
Expand Down Expand Up @@ -301,12 +302,19 @@ private[spark] object Config extends Logging {

val PYSPARK_MAJOR_PYTHON_VERSION =
ConfigBuilder("spark.kubernetes.pyspark.pythonVersion")
.doc("This sets the major Python version. Only 3 is available for Python3.")
.doc(
s"(Deprecated since Spark 3.1, please set '${PYSPARK_PYTHON.key}' and " +
s"'${PYSPARK_DRIVER_PYTHON.key}' configurations or $ENV_PYSPARK_PYTHON and " +
s"$ENV_PYSPARK_DRIVER_PYTHON environment variables instead.)")
.version("2.4.0")
.stringConf
.checkValue(pv => List("3").contains(pv),
"Ensure that major Python version is Python3")
.createWithDefault("3")
.checkValue("3" == _,
"Python 2 was dropped from Spark 3.1, and only 3 is allowed in " +
"this configuration. Note that this configuration was deprecated in Spark 3.1. " +
s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
"variables instead.")
.createOptional

val KUBERNETES_KERBEROS_KRB5_FILE =
ConfigBuilder("spark.kubernetes.kerberos.krb5.path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ private[spark] object Constants {
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"

// BINDINGS
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
val ENV_PYSPARK_PYTHON = "PYSPARK_PYTHON"
val ENV_PYSPARK_DRIVER_PYTHON = "PYSPARK_DRIVER_PYTHON"

// Pod spec templates
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.launcher.SparkLauncher

/**
* Creates the driver command for running the user app, and propagates needed configuration so
* executors can also find the app code.
*/
private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
extends KubernetesFeatureConfigStep {
extends KubernetesFeatureConfigStep with Logging {

override def configurePod(pod: SparkPod): SparkPod = {
conf.mainAppResource match {
Expand Down Expand Up @@ -70,12 +72,37 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
SparkPod(pod.pod, driverContainer)
}

// Exposed for testing purpose.
private[spark] def environmentVariables: Map[String, String] = sys.env

private def configureForPython(pod: SparkPod, res: String): SparkPod = {
if (conf.get(PYSPARK_MAJOR_PYTHON_VERSION).isDefined) {
logWarning(
s"${PYSPARK_MAJOR_PYTHON_VERSION.key} was deprecated in Spark 3.1. " +
s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
"variables instead.")
}

val pythonEnvs =
Seq(new EnvVarBuilder()
.withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
.withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
.build())
Seq(
conf.get(PYSPARK_PYTHON)
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_PYTHON)
.withValue(value)
.build()
},
conf.get(PYSPARK_DRIVER_PYTHON)
.orElse(conf.get(PYSPARK_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_DRIVER_PYTHON)
.withValue(value)
.build()
}
).flatten

// re-write primary resource to be the remote one and upload the related file
val newResName = KubernetesUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}

class DriverCommandFeatureStepSuite extends SparkFunSuite {

Expand Down Expand Up @@ -50,12 +51,51 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
mainResource, "5", "7", "9"))
}

test("python executable precedence") {
val mainResource = "local:/main.py"

val envs = spec.pod.container.getEnv.asScala
.map { env => (env.getName, env.getValue) }
.toMap
val expected = Map(ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "3")
assert(envs === expected)
val pythonExecutables = Seq(
(Some("conf_py"), Some("conf_driver_py"), Some("env_py"), Some("env_driver_py")),
(Some("conf_py"), None, Some("env_py"), Some("env_driver_py")),
(None, None, Some("env_py"), Some("env_driver_py")),
(None, None, Some("env_py"), None)
)

val expectedResults = Seq(
("conf_py", "conf_driver_py"),
("conf_py", "conf_py"),
("env_py", "env_driver_py"),
("env_py", "env_py")
)

pythonExecutables.zip(expectedResults).foreach { case (pythonExecutable, expected) =>
val sparkConf = new SparkConf(false)
val (confPy, confDriverPy, envPy, envDriverPy) = pythonExecutable
confPy.foreach(sparkConf.set(PYSPARK_PYTHON, _))
confDriverPy.foreach(sparkConf.set(PYSPARK_DRIVER_PYTHON, _))
val pythonEnvs = Map(
(
envPy.map(v => ENV_PYSPARK_PYTHON -> v :: Nil) ++
envDriverPy.map(v => ENV_PYSPARK_DRIVER_PYTHON -> v :: Nil)
).flatten.toArray: _*)

val spec = applyFeatureStep(
PythonMainAppResource(mainResource),
conf = sparkConf,
appArgs = Array("foo"),
env = pythonEnvs)

val envs = spec.pod.container.getEnv.asScala
.map { env => (env.getName, env.getValue) }
.toMap

val (expectedEnvPy, expectedDriverPy) = expected
assert(envs === Map(
ENV_PYSPARK_PYTHON -> expectedEnvPy,
ENV_PYSPARK_DRIVER_PYTHON -> expectedDriverPy))
}
}

test("R resource") {
Expand Down Expand Up @@ -123,13 +163,16 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
resource: MainAppResource,
conf: SparkConf = new SparkConf(false),
appArgs: Array[String] = Array(),
proxyUser: Option[String] = None): KubernetesDriverSpec = {
proxyUser: Option[String] = None,
env: Map[String, String] = Map.empty[String, String]): KubernetesDriverSpec = {
val kubernetesConf = KubernetesTestConf.createDriverConf(
sparkConf = conf,
mainAppResource = resource,
appArgs = appArgs,
proxyUser = proxyUser)
val step = new DriverCommandFeatureStep(kubernetesConf)
val step = new DriverCommandFeatureStep(kubernetesConf) {
private[spark] override val environmentVariables: Map[String, String] = env
}
val pod = step.configurePod(SparkPod.initialPod())
val props = step.getAdditionalPodSystemProperties()
KubernetesDriverSpec(pod, Nil, props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi

if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
pyv3="$(python3 -V 2>&1)"
export PYTHON_VERSION="${pyv3:7}"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PYTHON_VERSION and pyv3 apparently not used anywhere in Spark code base.

export PYSPARK_PYTHON="python3"
export PYSPARK_DRIVER_PYTHON="python3"
if ! [ -z ${PYSPARK_PYTHON+x} ]; then
export PYSPARK_PYTHON
fi
if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
export PYSPARK_DRIVER_PYTHON
fi

# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
Expand Down
Loading