Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2464,11 +2464,11 @@ private[spark] object Utils extends Logging with SparkClassUtils {
}

/**
* Return the prefix of a command that appends the given library paths to the
* system-specific library path environment variable. On Unix, for instance,
* this returns the string LD_LIBRARY_PATH="path1:path2:$LD_LIBRARY_PATH".
* Return the library path value that prepends the given paths to the existing
* system-specific library path environment variable value. On Unix, for instance,
* this returns "path1:path2:$LD_LIBRARY_PATH".
*/
def libraryPathEnvPrefix(libraryPaths: Seq[String]): String = {
def libraryPathEnvValue(libraryPaths: Seq[String]): String = {
val libraryPathScriptVar = if (isWindows) {
s"%${libraryPathEnvName}%"
} else {
Expand All @@ -2481,7 +2481,17 @@ private[spark] object Utils extends Logging with SparkClassUtils {
} else {
""
}
s"$libraryPathEnvName=$libraryPath$ampersand"
s"$libraryPath$ampersand"
}

/**
* Return the prefix of a command that appends the given library paths to the
* system-specific library path environment variable. On Unix, for instance,
* this returns the string LD_LIBRARY_PATH="path1:path2:$LD_LIBRARY_PATH".
*/
def libraryPathEnvPrefix(libraryPaths: Seq[String]): String = {
val libraryPath = libraryPathEnvValue(libraryPaths)
s"$libraryPathEnvName=$libraryPath"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ private[spark] object Constants {
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
// Expandable variables which should be exported at entrypoint.sh level
val POD_SHELL_PROFILE_VOLUME = "pod-shell-profile-volume"
val POD_SHELL_PROFILE_CONFIGMAP = "pod-shell-profile-conf-map"
val POD_SHELL_PROFILE_MOUNTPATH = "/opt/spark/.profiles"
val POD_SHELL_PROFILE_FILE_NAME = ".spark.profile"
val POD_SHELL_PROFILE_KEY = "spark-shell-profile-key"

// BINDINGS
val ENV_PYSPARK_PYTHON = "PYSPARK_PYTHON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ private[spark] class BasicExecutorFeatureStep(
(s"$ENV_JAVA_OPT_PREFIX$index", opt)
}.toMap

val extraLibraryPath: Map[String, String] = kubernetesConf.sparkConf
.get(EXECUTOR_LIBRARY_PATH)
.map { libPath =>
val libPathName = Utils.libraryPathEnvName
// LD_LIBRARY_PATH=user_set_lib:${LD_LIBRARY_PATH}
libPathName -> Utils.libraryPathEnvValue(Seq(libPath))
}.toMap

KubernetesUtils.buildEnvVars(
Seq(
ENV_DRIVER_URL -> driverUrl,
Expand All @@ -154,6 +162,7 @@ private[spark] class BasicExecutorFeatureStep(
++ kubernetesConf.environment
++ sparkAuthSecret
++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
++ extraLibraryPath
++ allOpts) ++
KubernetesUtils.buildEnvVarsWithFieldRef(
Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, EnvVar, HasMetadata, PodBuilder}

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH
import org.apache.spark.deploy.k8s.Constants._

private[spark] class ReconstructEnvFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
import ReconstructEnvFeatureStep._

private val additionalResources = ArrayBuffer.empty[HasMetadata]

private def configMapName = {
val suffix = "-" + POD_SHELL_PROFILE_CONFIGMAP
val prefix = conf.resourceNamePrefix
s"${prefix.take(KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH - suffix.length)}$suffix"
}

private def generateProfileContent(
envWithDeps: Seq[EnvVar],
mappings: Map[String, Set[String]]): String = {
// preprocess, build a map of env name to EnvVar
val envWithDepsMap = envWithDeps.map { env =>
env.getName -> env
}.toMap

// init
val reorderedEnvWithDeps = new ArrayBuffer[EnvVar]()
val visited = new mutable.HashSet[String]()

// reorder envWithDeps so that env variables that rely on other variables are placed later.
def dfsVisit(env: EnvVar): Unit = {
if (!visited.contains(env.getName)) {
visited += env.getName // added to visited earlier to avoid circle reference here
mappings.get(env.getName).foreach { deps =>
// filter itself, so PATH=$PATH:/usr/bin would be considered independent.
deps.filter(_ != env.getName).foreach { dep =>
envWithDepsMap.get(dep).foreach { depEnv =>
dfsVisit(depEnv)
}
}
}
reorderedEnvWithDeps += env
}
}
// try to reorder all the env variables
envWithDeps.foreach(dfsVisit)
// returns k=v pairs
reorderedEnvWithDeps
.map { env =>
val quotedValue = if (env.getValue.startsWith("\"")) {
// assumes env value is already quoted
env.getValue
} else {
"\"" + env.getValue + "\""
}
s"${env.getName}=$quotedValue"
}
.mkString("\n")
}

override def configurePod(pod: SparkPod): SparkPod = {
// extract all the environment variables from the pod that relies on some variables including
// itself, such as PATH=$PATH:/usr/bin, or LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/lib
val envWithDepsMappings = pod.container.getEnv.asScala.flatMap { env =>
// get all variables in the value of the environment variable
val variables = Option(env.getValue)
.map { envValue =>
ENV_VARIABLE_NAME_REGEX
.findAllIn(envValue)
.map(_.stripPrefix("$").stripPrefix("{").stripSuffix("}"))
.toSet
}
.getOrElse(Set.empty)
if (variables.nonEmpty) {
// this environment variable relies on some other variables
Some(env.getName -> variables)
} else {
// this is a simple environment variable, which could be set directly by K8S.
None
}
}.toMap

if (envWithDepsMappings.nonEmpty) {
val (envWithDeps, envWithoutDeps) = pod.container.getEnv.asScala
.partition(env => envWithDepsMappings.contains(env.getName))
val containerWithNewEnvAndVolumes = new ContainerBuilder(pod.container)
.withEnv(envWithoutDeps.asJava)
.addNewVolumeMount()
.withName(POD_SHELL_PROFILE_VOLUME)
.withMountPath(POD_SHELL_PROFILE_MOUNTPATH)
.endVolumeMount()
.build()
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolume()
.withName(POD_SHELL_PROFILE_VOLUME)
.withNewConfigMap()
.withName(configMapName)
.addNewItem()
.withKey(POD_SHELL_PROFILE_KEY)
.withPath(POD_SHELL_PROFILE_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.endSpec()
.build()
// add the config map as additional resource
val content = generateProfileContent(envWithDeps.toSeq, envWithDepsMappings)
additionalResources += new ConfigMapBuilder()
.withNewMetadata()
.withName(configMapName)
.endMetadata()
.withImmutable(true)
.addToData(POD_SHELL_PROFILE_KEY, content)
.build()
new SparkPod(podWithVolume, containerWithNewEnvAndVolumes)
} else {
// no environment variable relies on others, no need to reconstruct
pod
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
additionalResources.toSeq
}
}

private[spark] object ReconstructEnvFeatureStep {
// according to opengroup:
// https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap03.html#tag_03_235
// environment variable name can only contain underscores, digits, and alphabetics from the
// portable character set. And the first character of a name is not a digit.
private val ENV_VARIABLE_NAME_REGEX =
"\\$([A-Za-z_]+[A-Za-z0-9_]*|\\{[A-Za-z_]+[A-Za-z0-9_]*\\})".r
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ private[spark] class KubernetesDriverBuilder {
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
new LocalDirsFeatureStep(conf),
new ReconstructEnvFeatureStep(conf)
) ++ userFeatures

val spec = KubernetesDriverSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ private[spark] class KubernetesExecutorBuilder {
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
new LocalDirsFeatureStep(conf),
new ReconstructEnvFeatureStep(conf)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't work as expected as spark wouldn't create other resource when building executor pod.

Will fix and propose another solution.

) ++ userFeatures

val spec = KubernetesExecutorSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,20 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
assert(podConfigured1.container.getPorts.contains(ports))
}

test("SPARK-43505: extraLibraryPath should be translated into environment variable") {
baseConf.set(config.EXECUTOR_LIBRARY_PATH, "/opt/spark/python/lib")
initDefaultProfile(baseConf)
val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod())

checkEnv(executor, baseConf,
Map(Utils.libraryPathEnvName -> Utils.libraryPathEnvValue(Seq("/opt/spark/python/lib")),
"qux" -> "quux"))
checkOwnerReferences(executor.pod, DRIVER_POD_UID)
}

// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, EnvVarBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._

class ReconstructEnvFeatureStepSuite extends SparkFunSuite {

test("SPARK-43505: Do nothing when no special environment variables are specified") {
val conf = KubernetesTestConf.createDriverConf()
val step = new ReconstructEnvFeatureStep(conf)

val initialPod = SparkPod.initialPod()
val configuredPod = step.configurePod(initialPod)
assert(configuredPod === initialPod)

assert(step.getAdditionalKubernetesResources().isEmpty)
}

test("SPARK-43505: Special env variables are reconstructed for both driver and executor") {
val confs = Seq(KubernetesTestConf.createDriverConf(), KubernetesTestConf.createExecutorConf())
for (conf <- confs) {
val env = Map(
"PATH" -> "\"/usr/bin:${JAVA_HOME}/bin:$PATH\"",
"LD_LIBRARY_PATH" -> "$JAVA_HOME/lib/amd64/server:$HADOOP_HOME/native/lib:$LD_LIBRARY_PATH",
"JAVA_HOME" -> "/usr/lib/jvm/java-8-openjdk-amd64",
"HADOOP_HOME" -> "${SPARK_HOME}/../hadoop")
val envVarList = env.map { case (k, v) =>
new EnvVarBuilder().withName(k).withValue(v).build()
}.toSeq
val step = new ReconstructEnvFeatureStep(conf)
val initialPod = SparkPod.initialPod().transform { case SparkPod(pod, container) =>
val containerWithEnv = new ContainerBuilder(container)
.withEnv(envVarList.asJava)
.build()
SparkPod(pod, containerWithEnv)
}
val configuredPod = step.configurePod(initialPod)
// after this step, JAVA_HOME has no dependency, and would be kept as it is
// PATH, HADOOP_HOME, LD_LIBRARY_PATH variables should be expanded, and would be
// mounted as a .profile file in config map.
val expectedContainerEnv = Map("JAVA_HOME" -> "/usr/lib/jvm/java-8-openjdk-amd64")
val got = configuredPod.container.getEnv.asScala.map { envVar =>
envVar.getName -> envVar.getValue
}.toMap
assert(expectedContainerEnv === got)

// check config map and volume maps
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)

val volume = configuredPod.pod.getSpec.getVolumes.get(0)
val generatedResourceName = s"${conf.resourceNamePrefix}-$POD_SHELL_PROFILE_CONFIGMAP"
assert(volume.getName === POD_SHELL_PROFILE_VOLUME)
assert(volume.getConfigMap.getName === generatedResourceName)
assert(volume.getConfigMap.getItems.size() === 1)
assert(volume.getConfigMap.getItems.get(0).getKey === POD_SHELL_PROFILE_KEY)
assert(volume.getConfigMap.getItems.get(0).getPath === POD_SHELL_PROFILE_FILE_NAME)

assert(configuredPod.container.getVolumeMounts.size() === 1)
val volumeMount = configuredPod.container.getVolumeMounts.get(0)
assert(volumeMount.getMountPath === POD_SHELL_PROFILE_MOUNTPATH)
assert(volumeMount.getName === POD_SHELL_PROFILE_VOLUME)

// check config map content
val additionalResources = step.getAdditionalKubernetesResources()
assert(additionalResources.length === 1)
assert(additionalResources.head.getMetadata.getName === generatedResourceName)
assert(additionalResources.head.isInstanceOf[ConfigMap])
val configMap = additionalResources.head.asInstanceOf[ConfigMap]
assert(configMap.getData.size() === 1)
assert(configMap.getImmutable())
assert(configMap.getData.containsKey(POD_SHELL_PROFILE_KEY))

// LD_LIBRARY_PATH depends on HADOOP_HOME, so HADOOP_HOME is placed earlier
val expectedData =
"""
|PATH="/usr/bin:${JAVA_HOME}/bin:$PATH"
|HADOOP_HOME="${SPARK_HOME}/../hadoop"
|LD_LIBRARY_PATH="$JAVA_HOME/lib/amd64/server:$HADOOP_HOME/native/lib:$LD_LIBRARY_PATH"
|""".stripMargin.trim
assert(configMap.getData.get(POD_SHELL_PROFILE_KEY) === expectedData)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ else
SPARK_EXECUTOR_JAVA_OPTS=("${(@f)$(< java_opts.txt)}")
fi

# export environment variables that needs some variable expansion
if [ -f "${SPARK_HOME}/.profiles/.spark.profile" ]; then
. "${SPARK_HOME}/.profiles/.spark.profile"
fi

if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ rules:
- ""
resources:
- "pods"
- "configmaps"
- "secrets"
verbs:
- "*"
---
Expand Down
Loading