diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index ea293f03a2169..bb3a20dce2da4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -31,7 +31,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Properties, Try}
-import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -387,20 +386,40 @@ private[spark] class SparkSubmit extends Logging {
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
// Executors will get the jars from the Spark file server.
// Explicitly download the related files here
- args.jars = renameResourcesToLocalFS(args.jars, localJars)
+ args.jars = localJars
val filesLocalFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
- val archiveLocalFiles = Option(args.archives).map { uri =>
- val resolvedUri = Utils.resolveURI(uri)
- val downloadedUri = downloadFileList(
- UriBuilder.fromUri(resolvedUri).fragment(null).build().toString,
+ val archiveLocalFiles = Option(args.archives).map { uris =>
+ val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
+ val localArchives = downloadFileList(
+ resolvedUris.map(
+ UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf, secMgr)
- UriBuilder.fromUri(downloadedUri).fragment(resolvedUri.getFragment).build().toString
+
+ // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
+ // in cluster mode, the archives should be available in the driver's current working
+ // directory too.
+ Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map {
+ case (localArchive, resolvedUri) =>
+ val source = new File(localArchive.getPath)
+ val dest = new File(
+ ".",
+ if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
+ logInfo(
+ s"Unpacking an archive $resolvedUri " +
+ s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
+ Utils.deleteRecursively(dest)
+ Utils.unpack(source, dest)
+
+ // Keep the URIs of local files with the given fragments.
+ UriBuilder.fromUri(
+ localArchive).fragment(resolvedUri.getFragment).build().toString
+ }.mkString(",")
}.orNull
- args.files = renameResourcesToLocalFS(args.files, filesLocalFiles)
- args.archives = renameResourcesToLocalFS(args.archives, archiveLocalFiles)
- args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles)
+ args.files = filesLocalFiles
+ args.archives = archiveLocalFiles
+ args.pyFiles = localPyFiles
}
}
@@ -836,21 +855,6 @@ private[spark] class SparkSubmit extends Logging {
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}
- private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
- if (resources != null && localResources != null) {
- val localResourcesSeq = Utils.stringToSeq(localResources)
- Utils.stringToSeq(resources).map { resource =>
- val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
- localResourcesSeq.find { localUri =>
- val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
- filenameRemote == filenameLocal
- }.getOrElse(resource)
- }.mkString(",")
- } else {
- resources
- }
- }
-
// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e735c7493486e..93c6f94790abc 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1087,7 +1087,10 @@ See the [configuration page](configuration.html) for information on Spark config
spark.kubernetes.pyspark.pythonVersion |
"3" |
- This sets the major Python version of the docker image used to run the driver and executor containers. Can be 3.
+ This sets the major Python version of the docker image used to run the driver and executor containers.
+ It can be only "3". This configuration was deprecated from Spark 3.1.0, and is effectively no-op.
+ Users should set 'spark.pyspark.python' and 'spark.pyspark.driver.python' configurations or
+ 'PYSPARK_PYTHON' and 'PYSPARK_DRIVER_PYTHON' environment variables.
|
2.4.0 |
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index c28d6fd405ae1..0ad4fa8bf3a61 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.internal.config.ConfigBuilder
private[spark] object Config extends Logging {
@@ -301,12 +302,19 @@ private[spark] object Config extends Logging {
val PYSPARK_MAJOR_PYTHON_VERSION =
ConfigBuilder("spark.kubernetes.pyspark.pythonVersion")
- .doc("This sets the major Python version. Only 3 is available for Python3.")
+ .doc(
+ s"(Deprecated since Spark 3.1, please set '${PYSPARK_PYTHON.key}' and " +
+ s"'${PYSPARK_DRIVER_PYTHON.key}' configurations or $ENV_PYSPARK_PYTHON and " +
+ s"$ENV_PYSPARK_DRIVER_PYTHON environment variables instead.)")
.version("2.4.0")
.stringConf
- .checkValue(pv => List("3").contains(pv),
- "Ensure that major Python version is Python3")
- .createWithDefault("3")
+ .checkValue("3" == _,
+ "Python 2 was dropped from Spark 3.1, and only 3 is allowed in " +
+ "this configuration. Note that this configuration was deprecated in Spark 3.1. " +
+ s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
+ s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
+ "variables instead.")
+ .createOptional
val KUBERNETES_KERBEROS_KRB5_FILE =
ConfigBuilder("spark.kubernetes.kerberos.krb5.path")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 4014a964ed950..543ca12594763 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -74,7 +74,8 @@ private[spark] object Constants {
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
// BINDINGS
- val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
+ val ENV_PYSPARK_PYTHON = "PYSPARK_PYTHON"
+ val ENV_PYSPARK_DRIVER_PYTHON = "PYSPARK_DRIVER_PYTHON"
// Pod spec templates
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index d49381ba897d4..8015a1af3e17d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -24,6 +24,8 @@ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.launcher.SparkLauncher
/**
@@ -31,7 +33,7 @@ import org.apache.spark.launcher.SparkLauncher
* executors can also find the app code.
*/
private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
- extends KubernetesFeatureConfigStep {
+ extends KubernetesFeatureConfigStep with Logging {
override def configurePod(pod: SparkPod): SparkPod = {
conf.mainAppResource match {
@@ -70,12 +72,37 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
SparkPod(pod.pod, driverContainer)
}
+ // Exposed for testing purpose.
+ private[spark] def environmentVariables: Map[String, String] = sys.env
+
private def configureForPython(pod: SparkPod, res: String): SparkPod = {
+ if (conf.get(PYSPARK_MAJOR_PYTHON_VERSION).isDefined) {
+ logWarning(
+ s"${PYSPARK_MAJOR_PYTHON_VERSION.key} was deprecated in Spark 3.1. " +
+ s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
+ s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
+ "variables instead.")
+ }
+
val pythonEnvs =
- Seq(new EnvVarBuilder()
- .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
- .withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
- .build())
+ Seq(
+ conf.get(PYSPARK_PYTHON)
+ .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
+ new EnvVarBuilder()
+ .withName(ENV_PYSPARK_PYTHON)
+ .withValue(value)
+ .build()
+ },
+ conf.get(PYSPARK_DRIVER_PYTHON)
+ .orElse(conf.get(PYSPARK_PYTHON))
+ .orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
+ .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
+ new EnvVarBuilder()
+ .withName(ENV_PYSPARK_DRIVER_PYTHON)
+ .withValue(value)
+ .build()
+ }
+ ).flatten
// re-write primary resource to be the remote one and upload the related file
val newResName = KubernetesUtils
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
index a44d465e35087..ebbb42f225c51 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
class DriverCommandFeatureStepSuite extends SparkFunSuite {
@@ -50,12 +51,51 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
mainResource, "5", "7", "9"))
+ }
+
+ test("python executable precedence") {
+ val mainResource = "local:/main.py"
- val envs = spec.pod.container.getEnv.asScala
- .map { env => (env.getName, env.getValue) }
- .toMap
- val expected = Map(ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "3")
- assert(envs === expected)
+ val pythonExecutables = Seq(
+ (Some("conf_py"), Some("conf_driver_py"), Some("env_py"), Some("env_driver_py")),
+ (Some("conf_py"), None, Some("env_py"), Some("env_driver_py")),
+ (None, None, Some("env_py"), Some("env_driver_py")),
+ (None, None, Some("env_py"), None)
+ )
+
+ val expectedResults = Seq(
+ ("conf_py", "conf_driver_py"),
+ ("conf_py", "conf_py"),
+ ("env_py", "env_driver_py"),
+ ("env_py", "env_py")
+ )
+
+ pythonExecutables.zip(expectedResults).foreach { case (pythonExecutable, expected) =>
+ val sparkConf = new SparkConf(false)
+ val (confPy, confDriverPy, envPy, envDriverPy) = pythonExecutable
+ confPy.foreach(sparkConf.set(PYSPARK_PYTHON, _))
+ confDriverPy.foreach(sparkConf.set(PYSPARK_DRIVER_PYTHON, _))
+ val pythonEnvs = Map(
+ (
+ envPy.map(v => ENV_PYSPARK_PYTHON -> v :: Nil) ++
+ envDriverPy.map(v => ENV_PYSPARK_DRIVER_PYTHON -> v :: Nil)
+ ).flatten.toArray: _*)
+
+ val spec = applyFeatureStep(
+ PythonMainAppResource(mainResource),
+ conf = sparkConf,
+ appArgs = Array("foo"),
+ env = pythonEnvs)
+
+ val envs = spec.pod.container.getEnv.asScala
+ .map { env => (env.getName, env.getValue) }
+ .toMap
+
+ val (expectedEnvPy, expectedDriverPy) = expected
+ assert(envs === Map(
+ ENV_PYSPARK_PYTHON -> expectedEnvPy,
+ ENV_PYSPARK_DRIVER_PYTHON -> expectedDriverPy))
+ }
}
test("R resource") {
@@ -123,13 +163,16 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
resource: MainAppResource,
conf: SparkConf = new SparkConf(false),
appArgs: Array[String] = Array(),
- proxyUser: Option[String] = None): KubernetesDriverSpec = {
+ proxyUser: Option[String] = None,
+ env: Map[String, String] = Map.empty[String, String]): KubernetesDriverSpec = {
val kubernetesConf = KubernetesTestConf.createDriverConf(
sparkConf = conf,
mainAppResource = resource,
appArgs = appArgs,
proxyUser = proxyUser)
- val step = new DriverCommandFeatureStep(kubernetesConf)
+ val step = new DriverCommandFeatureStep(kubernetesConf) {
+ private[spark] override val environmentVariables: Map[String, String] = env
+ }
val pod = step.configurePod(SparkPod.initialPod())
val props = step.getAdditionalPodSystemProperties()
KubernetesDriverSpec(pod, Nil, props)
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index c837e00d2e468..f722471906bfb 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -44,11 +44,11 @@ if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi
-if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
- pyv3="$(python3 -V 2>&1)"
- export PYTHON_VERSION="${pyv3:7}"
- export PYSPARK_PYTHON="python3"
- export PYSPARK_DRIVER_PYTHON="python3"
+if ! [ -z ${PYSPARK_PYTHON+x} ]; then
+ export PYSPARK_PYTHON
+fi
+if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
+ export PYSPARK_DRIVER_PYTHON
fi
# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index a15f7ffa134b8..0d15e0325758d 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT,
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT}
import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
+import org.apache.spark.internal.config.{ARCHIVES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
import KubernetesSuite.k8sTestTag
@@ -135,7 +136,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.create(minioStatefulSet))
}
- private def deleteMinioStorage(): Unit = {
+ private def deleteMinioStorage(): Unit = {
kubernetesTestComponents
.kubernetesClient
.apps()
@@ -167,7 +168,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
tryDepsTest {
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
Utils.createTarGzFile(s"$HOST_PATH/$fileName", s"$HOST_PATH/$fileName.tar.gz")
- sparkAppConf.set("spark.archives", s"$HOST_PATH/$fileName.tar.gz#test_tar_gz")
+ sparkAppConf.set(ARCHIVES.key, s"$HOST_PATH/$fileName.tar.gz#test_tar_gz")
val examplesJar = Utils.getTestFileAbsolutePath(getExamplesJarName(), sparkHomeDir)
runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
appArgs = Array(s"test_tar_gz/$fileName"),
@@ -175,40 +176,81 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
}
}
+ test(
+ "SPARK-33748: Launcher python client respecting PYSPARK_PYTHON", k8sTestTag, MinikubeTag) {
+ val fileName = Utils.createTempFile(
+ """
+ |#!/usr/bin/env bash
+ |export IS_CUSTOM_PYTHON=1
+ |python3 "$@"
+ """.stripMargin, HOST_PATH)
+ Utils.createTarGzFile(s"$HOST_PATH/$fileName", s"$HOST_PATH/$fileName.tgz")
+ sparkAppConf.set(ARCHIVES.key, s"$HOST_PATH/$fileName.tgz#test_env")
+ val pySparkFiles = Utils.getTestFileAbsolutePath("python_executable_check.py", sparkHomeDir)
+ testPython(pySparkFiles,
+ Seq(
+ s"PYSPARK_PYTHON: ./test_env/$fileName",
+ s"PYSPARK_DRIVER_PYTHON: ./test_env/$fileName",
+ "Custom Python used on executor: True",
+ "Custom Python used on driver: True"),
+ env = Map("PYSPARK_PYTHON" -> s"./test_env/$fileName"))
+ }
+
+ test(
+ "SPARK-33748: Launcher python client respecting " +
+ s"${PYSPARK_PYTHON.key} and ${PYSPARK_DRIVER_PYTHON.key}", k8sTestTag, MinikubeTag) {
+ val fileName = Utils.createTempFile(
+ """
+ |#!/usr/bin/env bash
+ |export IS_CUSTOM_PYTHON=1
+ |python3 "$@"
+ """.stripMargin, HOST_PATH)
+ Utils.createTarGzFile(s"$HOST_PATH/$fileName", s"$HOST_PATH/$fileName.tgz")
+ sparkAppConf.set(ARCHIVES.key, s"$HOST_PATH/$fileName.tgz#test_env")
+ sparkAppConf.set(PYSPARK_PYTHON.key, s"./test_env/$fileName")
+ sparkAppConf.set(PYSPARK_DRIVER_PYTHON.key, "python3")
+ val pySparkFiles = Utils.getTestFileAbsolutePath("python_executable_check.py", sparkHomeDir)
+ testPython(pySparkFiles,
+ Seq(
+ s"PYSPARK_PYTHON: ./test_env/$fileName",
+ "PYSPARK_DRIVER_PYTHON: python3",
+ "Custom Python used on executor: True",
+ "Custom Python used on driver: False"))
+ }
+
test("Launcher python client dependencies using a zip file", k8sTestTag, MinikubeTag) {
+ val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir)
val inDepsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir)
val outDepsFile = s"${inDepsFile.substring(0, inDepsFile.lastIndexOf("."))}.zip"
Utils.createZipFile(inDepsFile, outDepsFile)
- testPythonDeps(outDepsFile)
+ testPython(
+ pySparkFiles,
+ Seq(
+ "Python runtime version check is: True",
+ "Python environment version check is: True",
+ "Python runtime version check for executor is: True"),
+ Some(outDepsFile))
}
- private def testPythonDeps(depsFile: String): Unit = {
- tryDepsTest({
- val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir)
+ private def testPython(
+ pySparkFiles: String,
+ expectedDriverLogs: Seq[String],
+ depsFile: Option[String] = None,
+ env: Map[String, String] = Map.empty[String, String]): Unit = {
+ tryDepsTest {
setPythonSparkConfProperties(sparkAppConf)
runSparkApplicationAndVerifyCompletion(
appResource = pySparkFiles,
mainClass = "",
- expectedDriverLogOnCompletion = Seq(
- "Python runtime version check is: True",
- "Python environment version check is: True",
- "Python runtime version check for executor is: True"),
+ expectedDriverLogOnCompletion = expectedDriverLogs,
appArgs = Array("python3"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
- pyFiles = Option(depsFile)) })
- }
-
- private def extractS3Key(data: String, key: String): String = {
- data.split("\n")
- .filter(_.contains(key))
- .head
- .split(":")
- .last
- .trim
- .replaceAll("[,|\"]", "")
+ pyFiles = depsFile,
+ env = env)
+ }
}
private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = {
@@ -269,7 +311,6 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
private def setPythonSparkConfProperties(conf: SparkAppConf): Unit = {
sparkAppConf.set("spark.kubernetes.container.image", pyImage)
- .set("spark.kubernetes.pyspark.pythonVersion", "3")
}
private def tryDepsTest(runTest: => Unit): Unit = {
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 7b2a2d0820238..494c82512adaf 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -274,7 +274,8 @@ class KubernetesSuite extends SparkFunSuite
isJVM: Boolean,
pyFiles: Option[String] = None,
executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
- decommissioningTest: Boolean = false): Unit = {
+ decommissioningTest: Boolean = false,
+ env: Map[String, String] = Map.empty[String, String]): Unit = {
// scalastyle:on argcount
val appArguments = SparkAppArguments(
@@ -370,7 +371,8 @@ class KubernetesSuite extends SparkFunSuite
TIMEOUT.value.toSeconds.toInt,
sparkHomeDir,
isJVM,
- pyFiles)
+ pyFiles,
+ env)
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 0bf01e6b66427..0392008fff2f5 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -109,7 +109,8 @@ private[spark] object SparkAppLauncher extends Logging {
timeoutSecs: Int,
sparkHomeDir: Path,
isJVM: Boolean,
- pyFiles: Option[String] = None): Unit = {
+ pyFiles: Option[String] = None,
+ env: Map[String, String] = Map.empty[String, String]): Unit = {
val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
val preCommandLine = if (isJVM) {
@@ -130,6 +131,6 @@ private[spark] object SparkAppLauncher extends Logging {
commandLine ++= appArguments.appArgs
}
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
- ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
+ ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs, env = env)
}
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
index a1ecd48e747ea..cc05990893e36 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.integrationtest
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
@@ -32,8 +33,10 @@ object ProcessUtils extends Logging {
def executeProcess(
fullCommand: Array[String],
timeout: Long,
- dumpErrors: Boolean = true): Seq[String] = {
+ dumpErrors: Boolean = true,
+ env: Map[String, String] = Map.empty[String, String]): Seq[String] = {
val pb = new ProcessBuilder().command(fullCommand: _*)
+ pb.environment().putAll(env.asJava)
pb.redirectErrorStream(true)
val proc = pb.start()
val outputLines = new ArrayBuffer[String]
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index 519443130008b..cc258533c2c8d 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -153,6 +153,7 @@ object Utils extends Logging {
}
def createTarGzFile(inFile: String, outFile: String): Unit = {
+ val oFile = new File(outFile)
val fileToTarGz = new File(inFile)
Utils.tryWithResource(
new FileInputStream(fileToTarGz)
@@ -160,15 +161,19 @@ object Utils extends Logging {
Utils.tryWithResource(
new TarArchiveOutputStream(
new GzipCompressorOutputStream(
- new FileOutputStream(
- new File(outFile))))
+ new FileOutputStream(oFile)))
) { tOut =>
val tarEntry = new TarArchiveEntry(fileToTarGz, fileToTarGz.getName)
+ // Each entry does not keep the file permission from the input file.
+ // Setting permissions in the input file do not work. Just simply set
+ // to 777.
+ tarEntry.setMode(0x81ff)
tOut.putArchiveEntry(tarEntry)
IOUtils.copy(fis, tOut)
tOut.closeArchiveEntry()
tOut.finish()
}
}
+ oFile.deleteOnExit()
}
}
diff --git a/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py b/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
index f6b3be2806c82..e6c0137c0405f 100644
--- a/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
+++ b/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
@@ -24,7 +24,7 @@ def version_check(python_env, major_python_version):
These are various tests to test the Python container image.
This file will be distributed via --py-files in the e2e tests.
"""
- env_version = os.environ.get('PYSPARK_PYTHON')
+ env_version = os.environ.get('PYSPARK_PYTHON', 'python3')
print("Python runtime version check is: " +
str(sys.version_info[0] == major_python_version))
diff --git a/resource-managers/kubernetes/integration-tests/tests/python_executable_check.py b/resource-managers/kubernetes/integration-tests/tests/python_executable_check.py
new file mode 100644
index 0000000000000..89fd2aacab1a3
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/tests/python_executable_check.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+ spark = SparkSession \
+ .builder \
+ .appName("PythonExecutableTest") \
+ .getOrCreate()
+
+ # Check python executable at executors
+ is_custom_python_executor = spark.range(1).rdd.map(
+ lambda _: "IS_CUSTOM_PYTHON" in os.environ).first()
+
+ print("PYSPARK_PYTHON: %s" % os.environ.get("PYSPARK_PYTHON"))
+ print("PYSPARK_DRIVER_PYTHON: %s" % os.environ.get("PYSPARK_DRIVER_PYTHON"))
+
+ print("Custom Python used on executor: %s" % is_custom_python_executor)
+
+ is_custom_python_driver = "IS_CUSTOM_PYTHON" in os.environ
+ print("Custom Python used on driver: %s" % is_custom_python_driver)
+
+ spark.stop()