diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala index 069a57d3dc47d..45a5b8d7dae93 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala @@ -104,6 +104,14 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf) } } + override def getAdditionalPodSystemProperties(): Map[String, String] = { + if (hasHadoopConf) { + Map(HADOOP_CONFIG_MAP_NAME -> existingConfMap.getOrElse(newConfigMapName)) + } else { + Map.empty + } + } + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { if (confDir.isDefined) { val fileMap = confFiles.map { file => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala new file mode 100644 index 0000000000000..8a2773c1ac31f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ + +/** + * Mounts the Hadoop configuration on the executor pod. + */ +private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf) + extends KubernetesFeatureConfigStep { + + private val hadoopConfigMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME) + + override def configurePod(original: SparkPod): SparkPod = { + original.transform { case pod if hadoopConfigMapName.isDefined => + val confVolume = new VolumeBuilder() + .withName(HADOOP_CONF_VOLUME) + .withNewConfigMap() + .withName(hadoopConfigMapName.get) + .endConfigMap() + .build() + + val podWithConf = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(confVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(HADOOP_CONF_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .build() + + SparkPod(podWithConf, containerWithMount) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 67aad00f98543..a85e42662b890 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -71,6 +71,7 @@ private[spark] class KubernetesExecutorBuilder { new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), + new HadoopConfExecutorFeatureStep(conf), new LocalDirsFeatureStep(conf)) ++ userFeatures val spec = KubernetesExecutorSpec( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala new file mode 100644 index 0000000000000..a60227814eb13 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import com.google.common.io.Files + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{Constants, KubernetesTestConf, SecretVolumeUtils, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.containerHasEnvVar +import org.apache.spark.util.{SparkConfWithEnv, Utils} + +class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite { + import SecretVolumeUtils._ + + test("SPARK-43504: mounts the hadoop config map on the executor pod") { + val confDir = Utils.createTempDir() + val confFiles = Set("core-site.xml", "hdfs-site.xml") + + confFiles.foreach { f => + Files.write("some data", new File(confDir, f), UTF_8) + } + + Seq( + Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()), + Map.empty[String, String]).foreach { env => + val hasHadoopConf = env.contains(ENV_HADOOP_CONF_DIR) + + val driverSparkConf = new SparkConfWithEnv(env) + val executorSparkConf = new SparkConf(false) + + val driverConf = KubernetesTestConf.createDriverConf(sparkConf = driverSparkConf) + val driverStep = new HadoopConfDriverFeatureStep(driverConf) + + val additionalPodSystemProperties = driverStep.getAdditionalPodSystemProperties() + if (hasHadoopConf) { + assert(additionalPodSystemProperties.contains(Constants.HADOOP_CONFIG_MAP_NAME)) + additionalPodSystemProperties.foreach { case (key, value) => + executorSparkConf.set(key, value) + } + } else { + assert(additionalPodSystemProperties.isEmpty) + } + + val executorConf = KubernetesTestConf.createExecutorConf(sparkConf = executorSparkConf) + val executorStep = new HadoopConfExecutorFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + checkPod(executorPod, hasHadoopConf) + } + } + + private def checkPod(pod: SparkPod, hasHadoopConf: Boolean): Unit = { + if (hasHadoopConf) { + assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME)) + assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR)) + } else { + assert(!podHasVolume(pod.pod, HADOOP_CONF_VOLUME)) + assert(!containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(!containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR)) + } + } +}