From 9e22902c4d39784a10b0387b2a59886bed184f60 Mon Sep 17 00:00:00 2001 From: Xianjin Date: Tue, 16 May 2023 19:36:07 +0800 Subject: [PATCH 1/4] [SPARK-43505][K8S] support env variables expansion and executor library path on k8s --- .../scala/org/apache/spark/util/Utils.scala | 20 ++- .../apache/spark/deploy/k8s/Constants.scala | 6 + .../features/BasicExecutorFeatureStep.scala | 9 ++ .../features/ReconstructEnvFeatureStep.scala | 152 ++++++++++++++++++ .../k8s/submit/KubernetesDriverBuilder.scala | 4 +- .../k8s/KubernetesExecutorBuilder.scala | 4 +- .../BasicExecutorFeatureStepSuite.scala | 14 ++ .../ReconstructEnvFeatureStepSuite.scala | 108 +++++++++++++ .../src/main/dockerfiles/spark/entrypoint.sh | 5 + .../integration-tests/dev/spark-rbac.yaml | 2 + .../k8s/integrationtest/EnvTestsSuite.scala | 88 ++++++++++ .../k8s/integrationtest/KubernetesSuite.scala | 4 +- 12 files changed, 407 insertions(+), 9 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c785c135a45ea..dfd729996da73 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2464,11 +2464,11 @@ private[spark] object Utils extends Logging with SparkClassUtils { } /** - * Return the prefix of a command that appends the given library paths to the - * system-specific library path environment variable. On Unix, for instance, - * this returns the string LD_LIBRARY_PATH="path1:path2:$LD_LIBRARY_PATH". + * Return the library path value that prepends the given paths to the existing + * system-specific library path environment variable value. On Unix, for instance, + * this returns "path1:path2:$LD_LIBRARY_PATH". */ - def libraryPathEnvPrefix(libraryPaths: Seq[String]): String = { + def libraryPathEnvValue(libraryPaths: Seq[String]): String = { val libraryPathScriptVar = if (isWindows) { s"%${libraryPathEnvName}%" } else { @@ -2481,7 +2481,17 @@ private[spark] object Utils extends Logging with SparkClassUtils { } else { "" } - s"$libraryPathEnvName=$libraryPath$ampersand" + s"$libraryPath$ampersand" + } + + /** + * Return the prefix of a command that appends the given library paths to the + * system-specific library path environment variable. On Unix, for instance, + * this returns the string LD_LIBRARY_PATH="path1:path2:$LD_LIBRARY_PATH". + */ + def libraryPathEnvPrefix(libraryPaths: Seq[String]): String = { + val libraryPath = libraryPathEnvValue(libraryPaths) + s"$libraryPathEnvName=$libraryPath" } /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 385734c557a38..bf110aae65c9b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -76,6 +76,12 @@ private[spark] object Constants { val SPARK_CONF_FILE_NAME = "spark.properties" val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME" val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" + // Expandable variables which should be exported at entrypoint.sh level + val POD_SHELL_PROFILE_VOLUME = "pod-shell-profile-volume" + val POD_SHELL_PROFILE_CONFIGMAP = "pod-shell-profile-conf-map" + val POD_SHELL_PROFILE_MOUNTPATH = "/opt/spark/.profiles" + val POD_SHELL_PROFILE_FILE_NAME = ".spark.profile" + val POD_SHELL_PROFILE_KEY = "spark-shell-profile-key" // BINDINGS val ENV_PYSPARK_PYTHON = "PYSPARK_PYTHON" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 0b0bbc30ba41a..0100014b0a803 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -141,6 +141,14 @@ private[spark] class BasicExecutorFeatureStep( (s"$ENV_JAVA_OPT_PREFIX$index", opt) }.toMap + val extraLibraryPath: Map[String, String] = kubernetesConf.sparkConf + .get(EXECUTOR_LIBRARY_PATH) + .map { libPath => + val libPathName = Utils.libraryPathEnvName + // LD_LIBRARY_PATH=user_set_lib:${LD_LIBRARY_PATH} + libPathName -> Utils.libraryPathEnvValue(Seq(libPath)) + }.toMap + KubernetesUtils.buildEnvVars( Seq( ENV_DRIVER_URL -> driverUrl, @@ -154,6 +162,7 @@ private[spark] class BasicExecutorFeatureStep( ++ kubernetesConf.environment ++ sparkAuthSecret ++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull) + ++ extraLibraryPath ++ allOpts) ++ KubernetesUtils.buildEnvVarsWithFieldRef( Seq( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala new file mode 100644 index 0000000000000..139441f1c07e4 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, EnvVar, HasMetadata, PodBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf) + extends KubernetesFeatureConfigStep { + import ReconstructEnvFeatureStep._ + + private val additionalResources = ArrayBuffer.empty[HasMetadata] + + private def configMapName = { + val suffix = "-" + POD_SHELL_PROFILE_CONFIGMAP + val prefix = conf.resourceNamePrefix + s"${prefix.take(KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH - suffix.length)}$suffix" + } + + private def generateProfileContent(envWithDeps: Seq[EnvVar], + mappings: Map[String, Set[String]]): String = { + // preprocess, build a map of env name to EnvVar + val envWithDepsMap = envWithDeps.map { env => + env.getName -> env + }.toMap + + // init + val reorderedEnvWithDeps = new ArrayBuffer[EnvVar]() + val visited = new mutable.HashSet[String]() + + // reorder envWithDeps so that env variables that rely on other variables are placed later. + def dfsVisit(env: EnvVar): Unit = { + if (!visited.contains(env.getName)) { + visited += env.getName // added to visited earlier to avoid circle reference here + mappings.get(env.getName).foreach { deps => + // filter itself, so PATH=$PATH:/usr/bin would be considered independent. + deps.filter(_ != env.getName).foreach { dep => + envWithDepsMap.get(dep).foreach { depEnv => + dfsVisit(depEnv) + } + } + } + reorderedEnvWithDeps += env + } + } + // try to reorder all the env variables + envWithDeps.foreach(dfsVisit) + // returns k=v pairs + reorderedEnvWithDeps.map { env => + val quotedValue = if (env.getValue.startsWith("\"")) { + // assumes env value is already quoted + env.getValue + } else { + "\"" + env.getValue + "\"" + } + s"${env.getName}=$quotedValue" + }.mkString("\n") + } + + override def configurePod(pod: SparkPod): SparkPod = { + // extract all the environment variables from the pod that relies on some variables including + // itself, such as PATH=$PATH:/usr/bin, or LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/lib + val envWithDepsMappings = pod.container.getEnv.asScala + .flatMap { env => + // get all variables in the value of the environment variable + val variables = Option(env.getValue).map { envValue => + ENV_VARIABLE_NAME_REGEX.findAllIn(envValue) + .map(_.stripPrefix("$").stripPrefix("{").stripSuffix("}")).toSet + }.getOrElse(Set.empty) + if (variables.nonEmpty) { + // this environment variable relies on some other variables + Some(env.getName -> variables) + } else { + // this is a simple environment variable, which could be set directly by K8S. + None + } + }.toMap + + if (envWithDepsMappings.nonEmpty) { + val (envWithDeps, envWithoutDeps) = pod.container.getEnv.asScala + .partition(env => envWithDepsMappings.contains(env.getName)) + val containerWithNewEnvAndVolumes = new ContainerBuilder(pod.container) + .withEnv(envWithoutDeps.asJava) + .addNewVolumeMount() + .withName(POD_SHELL_PROFILE_VOLUME) + .withMountPath(POD_SHELL_PROFILE_MOUNTPATH) + .endVolumeMount() + .build() + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolume() + .withName(POD_SHELL_PROFILE_VOLUME) + .withNewConfigMap() + .withName(configMapName) + .addNewItem() + .withKey(POD_SHELL_PROFILE_KEY) + .withPath(POD_SHELL_PROFILE_FILE_NAME) + .endItem() + .endConfigMap() + .endVolume() + .endSpec() + .build() + // add the config map as additional resource + val content = generateProfileContent(envWithDeps, envWithDepsMappings) + additionalResources += new ConfigMapBuilder() + .withNewMetadata() + .withName(configMapName) + .endMetadata() + .withImmutable(true) + .addToData(POD_SHELL_PROFILE_KEY, content) + .build() + new SparkPod(podWithVolume, containerWithNewEnvAndVolumes) + } else { + // no environment variable relies on others, no need to reconstruct + pod + } + } + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + additionalResources + } +} + +private[spark] object ReconstructEnvFeatureStep { + // according to opengroup: + // https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap03.html#tag_03_235 + // environment variable name can only contain underscores, digits, and alphabetics from the + // portable character set. And the first character of a name is not a digit. + private val ENV_VARIABLE_NAME_REGEX = + "\\$([A-Za-z_]+[A-Za-z0-9_]*|\\{[A-Za-z_]+[A-Za-z0-9_]*\\})".r +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 3b69754b9cdf1..642c1fb37f671 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -73,7 +73,9 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new ReconstructEnvFeatureStep(conf) + ) ++ userFeatures val spec = KubernetesDriverSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 67aad00f98543..0943f061d1265 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -71,7 +71,9 @@ private[spark] class KubernetesExecutorBuilder { new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new ReconstructEnvFeatureStep(conf) + ) ++ userFeatures val spec = KubernetesExecutorSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 32897014931cf..8b3c64eb9988c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -524,6 +524,20 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(podConfigured1.container.getPorts.contains(ports)) } + test("SPARK-43505: extraLibraryPath should be translated into environment variable") { + baseConf.set(config.EXECUTOR_LIBRARY_PATH, "/opt/spark/python/lib") + initDefaultProfile(baseConf) + val kconf = newExecutorConf(environment = Map("qux" -> "quux")) + val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf), + defaultProfile) + val executor = step.configurePod(SparkPod.initialPod()) + + checkEnv(executor, baseConf, + Map(Utils.libraryPathEnvName -> Utils.libraryPathEnvValue(Seq("/opt/spark/python/lib")), + "qux" -> "quux")) + checkOwnerReferences(executor.pod, DRIVER_POD_UID) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala new file mode 100644 index 0000000000000..3515004984cf6 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, EnvVarBuilder} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ + +class ReconstructEnvFeatureStepSuite extends SparkFunSuite { + + test("SPARK-43505: Do nothing when no special environment variables are specified") { + val conf = KubernetesTestConf.createDriverConf() + val step = new ReconstructEnvFeatureStep(conf) + + val initialPod = SparkPod.initialPod() + val configuredPod = step.configurePod(initialPod) + assert(configuredPod === initialPod) + + assert(step.getAdditionalKubernetesResources().isEmpty) + } + + test("SPARK-43505: Special env variables are reconstructed for both driver and executor") { + val confs = Seq(KubernetesTestConf.createDriverConf(), KubernetesTestConf.createExecutorConf()) + for (conf <- confs) { + val env = Map( + "PATH" -> "\"/usr/bin:${JAVA_HOME}/bin:$PATH\"", + "LD_LIBRARY_PATH" -> "$JAVA_HOME/lib/amd64/server:$HADOOP_HOME/native/lib:$LD_LIBRARY_PATH", + "JAVA_HOME" -> "/usr/lib/jvm/java-8-openjdk-amd64", + "HADOOP_HOME" -> "${SPARK_HOME}/../hadoop" + ) + val envVarList = env.map { case (k, v) => + new EnvVarBuilder().withName(k).withValue(v).build() + }.toSeq + val step = new ReconstructEnvFeatureStep(conf) + val initialPod = SparkPod.initialPod().transform { case SparkPod(pod, container) => + val containerWithEnv = new ContainerBuilder(container) + .withEnv(envVarList.asJava) + .build() + SparkPod(pod, containerWithEnv) + } + val configuredPod = step.configurePod(initialPod) + // after this step, JAVA_HOME has no dependency, and would be kept as it is + // PATH, HADOOP_HOME, LD_LIBRARY_PATH variables should be expanded, and would be + // mounted as a .profile file in config map. + val expectedContainerEnv = Map( + "JAVA_HOME" -> "/usr/lib/jvm/java-8-openjdk-amd64" + ) + val got = configuredPod.container.getEnv.asScala.map { envVar => + envVar.getName -> envVar.getValue + }.toMap + assert(expectedContainerEnv === got) + + // check config map and volume maps + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + + val volume = configuredPod.pod.getSpec.getVolumes.get(0) + val generatedResourceName = s"${conf.resourceNamePrefix}-$POD_SHELL_PROFILE_CONFIGMAP" + assert(volume.getName === POD_SHELL_PROFILE_VOLUME) + assert(volume.getConfigMap.getName === generatedResourceName) + assert(volume.getConfigMap.getItems.size() === 1) + assert(volume.getConfigMap.getItems.get(0).getKey === POD_SHELL_PROFILE_KEY) + assert(volume.getConfigMap.getItems.get(0).getPath === POD_SHELL_PROFILE_FILE_NAME) + + assert(configuredPod.container.getVolumeMounts.size() === 1) + val volumeMount = configuredPod.container.getVolumeMounts.get(0) + assert(volumeMount.getMountPath === POD_SHELL_PROFILE_MOUNTPATH) + assert(volumeMount.getName === POD_SHELL_PROFILE_VOLUME) + + // check config map content + val additionalResources = step.getAdditionalKubernetesResources() + assert(additionalResources.length === 1) + assert(additionalResources.head.getMetadata.getName === generatedResourceName) + assert(additionalResources.head.isInstanceOf[ConfigMap]) + val configMap = additionalResources.head.asInstanceOf[ConfigMap] + assert(configMap.getData.size() === 1) + assert(configMap.getImmutable()) + assert(configMap.getData.containsKey(POD_SHELL_PROFILE_KEY)) + + // LD_LIBRARY_PATH depends on HADOOP_HOME, so HADOOP_HOME is placed earlier + val expectedData = + """ + |PATH="/usr/bin:${JAVA_HOME}/bin:$PATH" + |HADOOP_HOME="${SPARK_HOME}/../hadoop" + |LD_LIBRARY_PATH="$JAVA_HOME/lib/amd64/server:$HADOOP_HOME/native/lib:$LD_LIBRARY_PATH" + |""".stripMargin.trim + assert(configMap.getData.get(POD_SHELL_PROFILE_KEY) === expectedData) + } + + } +} diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 42f4df88f3da9..b4f87dbde780a 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -48,6 +48,11 @@ else SPARK_EXECUTOR_JAVA_OPTS=("${(@f)$(< java_opts.txt)}") fi +# export environment variables that needs some variable expansion +if [ -f "${SPARK_HOME}/.profiles/.spark.profile" ]; then + . "${SPARK_HOME}/.profiles/.spark.profile" +fi + if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH" fi diff --git a/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml index f6b8b10c87b15..85b2bfb7a0486 100644 --- a/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml +++ b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml @@ -35,6 +35,8 @@ rules: - "" resources: - "pods" + - "configmaps" + - "secrets" verbs: - "*" --- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala new file mode 100644 index 0000000000000..f2772ab5bb1f3 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import io.fabric8.kubernetes.api.model.Pod +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ + +trait EnvTestsSuite { k8sSuite: KubernetesSuite => + import EnvTestsSuite._ + + private val expectedPathLine = + s""" + |PATH="/opt/spark/bin:$$PATH" + |""".stripMargin.trim + private val expectedLDLibPathLine = + s""" + |LD_LIBRARY_PATH="/opt/spark/lib:$$LD_LIBRARY_PATH" + |""".stripMargin.trim + + test("SPARK-43505: Run SparkPi with extraLibraryPath and Path", k8sTestTag) { + sparkAppConf + .set("spark.executor.extraLibraryPath", "/opt/spark/lib") + .set("spark.kubernetes.driverEnv.PATH", "/opt/spark/bin:$PATH") + .set("spark.executorEnv.PATH", "/opt/spark/bin:$PATH") + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + checkEnv(driverPod, true) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkEnv(executorPod, false) + }, + appArgs = Array("1000") // give it enough time for all execs to be visible + ) + } + + private def checkEnv(pod: Pod, isDriver: Boolean): Unit = { + logDebug(s"Checking env for ${pod}") + // Wait for the pod to become ready & have secrets provisioned + implicit val podName: String = pod.getMetadata.getName + implicit val components: KubernetesTestComponents = kubernetesTestComponents + val env = Eventually.eventually(TIMEOUT, INTERVAL) { + logDebug(s"Checking env of ${pod.getMetadata().getName()} with entrypoint") + val env = Utils.executeCommand("/opt/spark/entrypoint.sh", "env") + assert(!env.isEmpty) + env + } + env should include("PATH=/opt/spark/bin:") + if (!isDriver) { + // when on executor, extra lib path should also be udpated + env should include("LD_LIBRARY_PATH=/opt/spark/lib:") + } + + // Make sure our secret files are mounted correctly + val files = Utils.executeCommand("ls", s"${PROFILE_MOUNT_PATH}") + files should include(PROFILE_FILE) + + // check file content + val profile = Utils.executeCommand("cat", s"$PROFILE_MOUNT_PATH/$PROFILE_FILE") + profile should include(expectedPathLine) + if (!isDriver) { + profile should include(expectedLDLibPathLine) + } + } +} + +private[spark] object EnvTestsSuite { + val PROFILE_MOUNT_PATH = "/opt/spark/.profiles" + val PROFILE_FILE = ".spark.profile" +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 19d46b1e194a3..9a430e347bdfc 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -45,8 +45,8 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SparkConfPropagateSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite - with PVTestsSuite with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging - with Eventually with Matchers { + with PVTestsSuite with DepsTestsSuite with DecommissionSuite with RTestsSuite with EnvTestsSuite + with Logging with Eventually with Matchers { import KubernetesSuite._ From 79b267271301b0ddc7df87d608925f7de5c1c4ae Mon Sep 17 00:00:00 2001 From: Xianjin Date: Wed, 17 May 2023 16:26:38 +0800 Subject: [PATCH 2/4] fix: scala 2.13 compile --- .../spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala index 139441f1c07e4..51694964a2c9f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala @@ -122,7 +122,7 @@ private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf) .endSpec() .build() // add the config map as additional resource - val content = generateProfileContent(envWithDeps, envWithDepsMappings) + val content = generateProfileContent(envWithDeps.toSeq, envWithDepsMappings) additionalResources += new ConfigMapBuilder() .withNewMetadata() .withName(configMapName) @@ -138,7 +138,7 @@ private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf) } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { - additionalResources + additionalResources.toSeq } } From 35f2d2872adde42048d338d9d55d305da24f35b4 Mon Sep 17 00:00:00 2001 From: Xianjin Date: Wed, 17 May 2023 18:51:35 +0800 Subject: [PATCH 3/4] fix typo in `/opt/entrypoint.sh` path --- .../apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala index f2772ab5bb1f3..161268f9ce02f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/EnvTestsSuite.scala @@ -59,7 +59,7 @@ trait EnvTestsSuite { k8sSuite: KubernetesSuite => implicit val components: KubernetesTestComponents = kubernetesTestComponents val env = Eventually.eventually(TIMEOUT, INTERVAL) { logDebug(s"Checking env of ${pod.getMetadata().getName()} with entrypoint") - val env = Utils.executeCommand("/opt/spark/entrypoint.sh", "env") + val env = Utils.executeCommand("/opt/entrypoint.sh", "env") assert(!env.isEmpty) env } From c84ad79b9aba420da01a3db4f018e7260b8e72fc Mon Sep 17 00:00:00 2001 From: Xianjin Date: Wed, 17 May 2023 19:00:49 +0800 Subject: [PATCH 4/4] run scalafmt --- .../features/ReconstructEnvFeatureStep.scala | 60 ++++++++++--------- .../ReconstructEnvFeatureStepSuite.scala | 7 +-- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala index 51694964a2c9f..ff9c9b6151f17 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStep.scala @@ -27,7 +27,7 @@ import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENG import org.apache.spark.deploy.k8s.Constants._ private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf) - extends KubernetesFeatureConfigStep { + extends KubernetesFeatureConfigStep { import ReconstructEnvFeatureStep._ private val additionalResources = ArrayBuffer.empty[HasMetadata] @@ -38,8 +38,9 @@ private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf) s"${prefix.take(KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH - suffix.length)}$suffix" } - private def generateProfileContent(envWithDeps: Seq[EnvVar], - mappings: Map[String, Set[String]]): String = { + private def generateProfileContent( + envWithDeps: Seq[EnvVar], + mappings: Map[String, Set[String]]): String = { // preprocess, build a map of env name to EnvVar val envWithDepsMap = envWithDeps.map { env => env.getName -> env @@ -67,35 +68,40 @@ private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf) // try to reorder all the env variables envWithDeps.foreach(dfsVisit) // returns k=v pairs - reorderedEnvWithDeps.map { env => - val quotedValue = if (env.getValue.startsWith("\"")) { - // assumes env value is already quoted - env.getValue - } else { - "\"" + env.getValue + "\"" + reorderedEnvWithDeps + .map { env => + val quotedValue = if (env.getValue.startsWith("\"")) { + // assumes env value is already quoted + env.getValue + } else { + "\"" + env.getValue + "\"" + } + s"${env.getName}=$quotedValue" } - s"${env.getName}=$quotedValue" - }.mkString("\n") + .mkString("\n") } override def configurePod(pod: SparkPod): SparkPod = { // extract all the environment variables from the pod that relies on some variables including // itself, such as PATH=$PATH:/usr/bin, or LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/lib - val envWithDepsMappings = pod.container.getEnv.asScala - .flatMap { env => - // get all variables in the value of the environment variable - val variables = Option(env.getValue).map { envValue => - ENV_VARIABLE_NAME_REGEX.findAllIn(envValue) - .map(_.stripPrefix("$").stripPrefix("{").stripSuffix("}")).toSet - }.getOrElse(Set.empty) - if (variables.nonEmpty) { - // this environment variable relies on some other variables - Some(env.getName -> variables) - } else { - // this is a simple environment variable, which could be set directly by K8S. - None + val envWithDepsMappings = pod.container.getEnv.asScala.flatMap { env => + // get all variables in the value of the environment variable + val variables = Option(env.getValue) + .map { envValue => + ENV_VARIABLE_NAME_REGEX + .findAllIn(envValue) + .map(_.stripPrefix("$").stripPrefix("{").stripSuffix("}")) + .toSet } - }.toMap + .getOrElse(Set.empty) + if (variables.nonEmpty) { + // this environment variable relies on some other variables + Some(env.getName -> variables) + } else { + // this is a simple environment variable, which could be set directly by K8S. + None + } + }.toMap if (envWithDepsMappings.nonEmpty) { val (envWithDeps, envWithoutDeps) = pod.container.getEnv.asScala @@ -103,8 +109,8 @@ private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf) val containerWithNewEnvAndVolumes = new ContainerBuilder(pod.container) .withEnv(envWithoutDeps.asJava) .addNewVolumeMount() - .withName(POD_SHELL_PROFILE_VOLUME) - .withMountPath(POD_SHELL_PROFILE_MOUNTPATH) + .withName(POD_SHELL_PROFILE_VOLUME) + .withMountPath(POD_SHELL_PROFILE_MOUNTPATH) .endVolumeMount() .build() val podWithVolume = new PodBuilder(pod.pod) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala index 3515004984cf6..b20a83668c855 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/ReconstructEnvFeatureStepSuite.scala @@ -44,8 +44,7 @@ class ReconstructEnvFeatureStepSuite extends SparkFunSuite { "PATH" -> "\"/usr/bin:${JAVA_HOME}/bin:$PATH\"", "LD_LIBRARY_PATH" -> "$JAVA_HOME/lib/amd64/server:$HADOOP_HOME/native/lib:$LD_LIBRARY_PATH", "JAVA_HOME" -> "/usr/lib/jvm/java-8-openjdk-amd64", - "HADOOP_HOME" -> "${SPARK_HOME}/../hadoop" - ) + "HADOOP_HOME" -> "${SPARK_HOME}/../hadoop") val envVarList = env.map { case (k, v) => new EnvVarBuilder().withName(k).withValue(v).build() }.toSeq @@ -60,9 +59,7 @@ class ReconstructEnvFeatureStepSuite extends SparkFunSuite { // after this step, JAVA_HOME has no dependency, and would be kept as it is // PATH, HADOOP_HOME, LD_LIBRARY_PATH variables should be expanded, and would be // mounted as a .profile file in config map. - val expectedContainerEnv = Map( - "JAVA_HOME" -> "/usr/lib/jvm/java-8-openjdk-amd64" - ) + val expectedContainerEnv = Map("JAVA_HOME" -> "/usr/lib/jvm/java-8-openjdk-amd64") val got = configuredPod.container.getEnv.asScala.map { envVar => envVar.getName -> envVar.getValue }.toMap