diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 53b883b6bbf30..28999c7879407 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -344,7 +344,26 @@ $ kubectl -n= logs -f The same logs can also be accessed through the [Kubernetes dashboard](https://kubernetes.io/docs/tasks/access-application-cluster/web-ui-dashboard/) if installed on the cluster. +### Setting up custom logging configuration +Spark uses `log4j.properties` under the `SPARK_HOME/conf` dir as the standard way of setting up a logger configuration. + +1) Kubernetes lets us define a + [configMap](https://kubernetes.io/docs/concepts/storage/volumes/#configmap) and can be sometimes convenient + to provide custom logging configuration using a configMap. For example, configMap can be updated irrespective + of point of submission of spark job. This is particularly useful in automated deployment and building + spark as a service. + Once the user defined config map is created inside the kubernetes cluster, we can mount it and configure + for spark to use it, by setting the property: `spark.kubernetes.loggingConf.configMapName` and + `spark.kubernetes.loggingConf.fileName`. `spark.kubernetes.loggingConf.configMapName` is the name of config map + in kubernetes and `spark.kubernetes.loggingConf.fileName` is the name of the file used to create the + configMap or it is also referred to as the key of the configMap. + +2) For spark "cluster" mode of deployment, the configuration file present inside the `conf/` dir is + by default auto configured as a kubernetes configMap and setup for all executors and driver as logging + configuration. The name of the logging configuration file is by default `log4j.properties` and can be + altered using the property: `spark.kubernetes.loggingConf.fileName`. +Setting up a `configMapName` takes precedence over the other methods of setting logger configuration. ### Accessing Driver UI The UI associated with any application can be accessed locally using @@ -574,6 +593,24 @@ See the [configuration page](configuration.html) for information on Spark config excessive CPU usage on the spark driver. + + spark.kubernetes.loggingConf.configMapName + (none) + + Specify the name of the k8s ConfigMap, containing the logging configuration file to be mounted + on the driver and executors for custom logger configuration. If the key is not the default value + i.e. `log4j.properties`, then it should be specified via the property + spark.kubernetes.loggingConf.fileName. + + + + spark.kubernetes.loggingConf.fileName + log4j.properties + + Specify the name of the file, containing the logging configuration, to be mounted on the driver + and executors for custom logger configuration. + + spark.kubernetes.authenticate.submission.caCertFile (none) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 46fd8e7be2f1d..63296e96daca0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -283,6 +283,22 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_LOGGING_CONF_CONFIG_MAP = + ConfigBuilder("spark.kubernetes.loggingConf.configMapName") + .doc("Specify the name of the k8s ConfigMap, containing the logging configuration file, " + + "to be mounted on the driver and executors for custom logger configuration. If the key " + + " is not the default value i.e. `log4j.properties` then please specify it via property" + + "`spark.kubernetes.loggingConf.fileName`.") + .stringConf + .createOptional + + val KUBERNETES_LOGGING_CONF_FILE_NAME = + ConfigBuilder("spark.kubernetes.loggingConf.fileName") + .doc("Specify the name of the file, containing the logging configuration, " + + "to be mounted on the driver and executors for custom logger configuration.") + .stringConf + .createWithDefault("log4j.properties") + val KUBERNETES_KERBEROS_DT_SECRET_NAME = ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.name") .doc("Specify the name of the secret where your existing delegation tokens are stored. " + @@ -366,7 +382,7 @@ private[spark] object Config extends Logging { val KUBERNETES_FILE_UPLOAD_PATH = ConfigBuilder("spark.kubernetes.file.upload.path") .doc("Hadoop compatible file system path where files from the local file system " + - "will be uploded to in cluster mode.") + "will be uploaded to in cluster mode.") .stringConf .createOptional diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index a3c74ff7b2885..72e1bfca9422a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -67,7 +67,7 @@ private[spark] object Constants { val SPARK_CONF_FILE_NAME = "spark.properties" val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME" val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" - + val LOGGING_MOUNT_DIR = "/opt/spark/log" // BINDINGS val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala index fd1196368a7ff..d32b83889df73 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala @@ -25,11 +25,11 @@ private[spark] case class SparkPod(pod: Pod, container: Container) { * * Use it like: * - * original.modify { case pod => + * original.transform { case pod => * // update pod and return new one - * }.modify { case pod => + * }.transform { case pod => * // more changes that create a new pod - * }.modify { + * }.transform { * case pod if someCondition => // new pod * } * diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLogConfFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLogConfFeatureStep.scala new file mode 100644 index 0000000000000..61e9aae0cda47 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountLogConfFeatureStep.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.features + +import java.net.URL +import java.util.UUID + +import scala.io.{Codec, Source} + +import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, ContainerBuilder, HasMetadata, KeyToPath, PodBuilder} + +import org.apache.spark.deploy.k8s.{Config, KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.launcher.SparkLauncher.{DRIVER_EXTRA_JAVA_OPTIONS, EXECUTOR_EXTRA_JAVA_OPTIONS} + +/** + * Mounts the logger configuration from local configuration directory - + * on the spark job submitter's end or from a pre-defined config map. + */ +class MountLogConfFeatureStep(conf: KubernetesConf) + extends KubernetesFeatureConfigStep with Logging { + + private val useExistingConfigMap = conf.get(Config.KUBERNETES_LOGGING_CONF_CONFIG_MAP).isDefined + private val configMapName: String = conf.get(Config.KUBERNETES_LOGGING_CONF_CONFIG_MAP) + .getOrElse(s"config-map-logger-${UUID.randomUUID().toString.take(3)}") + private val loggingConfigFileName: String = conf.get(Config.KUBERNETES_LOGGING_CONF_FILE_NAME) + private val loggingConfURL: URL = this.getClass.getClassLoader.getResource(loggingConfigFileName) + private val loggerJVMProp = + s"${MountLogConfFeatureStep.JAVA_OPT_FOR_LOGGING}${loggingConfigFileName}" + + private val featureEnabled: Boolean = { + (loggingConfURL != null && + conf.getOption(SparkLauncher.DEPLOY_MODE).getOrElse("client") + .equalsIgnoreCase("cluster")) || + useExistingConfigMap + } + + override def configurePod(pod: SparkPod): SparkPod = { + val logConfVolume = s"log-conf-vol-${UUID.randomUUID().toString.take(3)}" + if (useExistingConfigMap) { + logInfo(s"Using an existing config map ${configMapName} for logging configuration.") + } + val keyToPath = new KeyToPath(loggingConfigFileName, 420, loggingConfigFileName) + + val podUpdated = if (featureEnabled) { + new PodBuilder(pod.pod) + .editSpec() + .addNewVolume() + .withName(logConfVolume) + .withNewConfigMap() + .withItems(keyToPath) + .withName(configMapName) + .endConfigMap() + .endVolume() + .endSpec() + .build() + } else { + logDebug(s"Logging configuration mount not performed.") + pod.pod + } + + val containerUpdated = + if (featureEnabled) { + new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(logConfVolume) + .withMountPath(LOGGING_MOUNT_DIR) + .endVolumeMount() + .build() + } else { + pod.container + } + SparkPod(podUpdated, containerUpdated) + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = { + if (featureEnabled) { + val executorJavaOpts = + s"$loggerJVMProp ${conf.get(EXECUTOR_EXTRA_JAVA_OPTIONS, "")}" + val driverJavaOpts = + s"$loggerJVMProp ${conf.get(DRIVER_EXTRA_JAVA_OPTIONS, "")}" + Map((EXECUTOR_EXTRA_JAVA_OPTIONS -> executorJavaOpts), + (DRIVER_EXTRA_JAVA_OPTIONS -> driverJavaOpts), + (Config.KUBERNETES_LOGGING_CONF_CONFIG_MAP.key -> configMapName)) + } else { + Map.empty[String, String] + } + } + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + if (featureEnabled && !useExistingConfigMap) { + Seq(MountLogConfFeatureStep + .buildConfigMap(loggingConfURL, loggingConfigFileName, configMapName)) + } else { + Seq[HasMetadata]() + } + } +} + +private[k8s] object MountLogConfFeatureStep extends Logging { + // Logging configuration for containers. + val JAVA_OPT_FOR_LOGGING = s"-Dlog4j.configuration=file://$LOGGING_MOUNT_DIR/" + + private[k8s] def buildConfigMap( + loggingConfUrl: URL, loggingConfigFileName: String, configMapName: String): ConfigMap = { + logInfo(s"Logging configuration is picked up from: $loggingConfigFileName") + val loggerConfStream = loggingConfUrl.openStream() + val loggerConfString = Source.createBufferedSource(loggerConfStream)(Codec.UTF8) + .getLines().mkString("\n") + new ConfigMapBuilder() + .withNewMetadata() + .withName(configMapName) + .endMetadata() + .addToData(loggingConfigFileName, loggerConfString) + .build() + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 43639a3b7dc1b..54856d4390a7e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -48,7 +48,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) + new LocalDirsFeatureStep(conf), + new MountLogConfFeatureStep(conf)) val spec = KubernetesDriverSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 22bff2c807330..a945ccd4ccb89 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -45,7 +45,8 @@ private[spark] class KubernetesExecutorBuilder { new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), - new LocalDirsFeatureStep(conf)) + new LocalDirsFeatureStep(conf), + new MountLogConfFeatureStep(conf)) features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLogConfFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLogConfFeatureStepSuite.scala new file mode 100644 index 0000000000000..074fc8d0c68f9 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountLogConfFeatureStepSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{Config, KubernetesTestConf, SparkPod} +import org.apache.spark.launcher.SparkLauncher + +class MountLogConfFeatureStepSuite extends SparkFunSuite { + + test("Do not enable, mount logger configuration feature," + + " if neither of logging configuration file or user defined config map is configured.") { + val sparkConf = new SparkConf(false) + .set(SparkLauncher.DEPLOY_MODE, "cluster") + .set(Config.KUBERNETES_LOGGING_CONF_FILE_NAME, "some-non-existent-file-name.none") + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val step = new MountLogConfFeatureStep(conf) + val initPod = SparkPod.initialPod() + val configuredPod = step.configurePod(initPod) + assert(configuredPod == initPod, "Pod should be unchanged.") + } + + test("Do not mount logger config for client mode," + + " unless an explicit config map is configured") { + + val sparkConf = new SparkConf(false) + .set(SparkLauncher.DEPLOY_MODE, "client") + .set(Config.KUBERNETES_LOGGING_CONF_FILE_NAME, "log4j.properties") + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val step = new MountLogConfFeatureStep(conf) + val initPod = SparkPod.initialPod() + val configuredPod = step.configurePod(initPod) + assert(configuredPod == initPod, "Pod should be unchanged.") + val configMapName = "config-map-logging" + sparkConf.set(Config.KUBERNETES_LOGGING_CONF_CONFIG_MAP, configMapName) + val conf2 = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val step2 = new MountLogConfFeatureStep(conf2) + assert(step2.getAdditionalPodSystemProperties().nonEmpty) + val configuredPod2 = step2.configurePod(initPod) + assert(configuredPod2.pod.getSpec.getVolumes.get(0).getConfigMap.getName == configMapName) + } + + test("Do not auto-create a ConfigMap if user provided ConfigMap is configured.") { + val sparkConf = new SparkConf(false) + .set(SparkLauncher.DEPLOY_MODE, "cluster") + .set(Config.KUBERNETES_LOGGING_CONF_CONFIG_MAP, "user-created-config-map-name") + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val step = new MountLogConfFeatureStep(conf) + assert(step.getAdditionalKubernetesResources() == Seq.empty) + } + + test("User provided ConfigMap takes precedence and is configured properly.") { + val configMapName = "user-created-config-map-name" + val sparkConf = new SparkConf(false) + .set(SparkLauncher.DEPLOY_MODE, "cluster") + .set(Config.KUBERNETES_LOGGING_CONF_CONFIG_MAP, configMapName) + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val step = new MountLogConfFeatureStep(conf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + assert(hasConfigMap(configuredPod.pod, configMapName)) + } + + private def hasConfigMap(pod: Pod, configMapName: String): Boolean = { + pod.getSpec.getVolumes.asScala.exists { volume => + volume.getConfigMap.getName == configMapName + } + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/log-config-test-log4j.properties b/resource-managers/kubernetes/integration-tests/src/test/resources/log-config-test-log4j.properties new file mode 100644 index 0000000000000..f9f3ce6350dc0 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/log-config-test-log4j.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This log4j config file is for integration test MountLoggerConfigMapSuite. +log4j.rootCategory=DEBUG, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 61e1f27b55462..40fef7839fa9a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -42,9 +42,8 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually - with Matchers { - + with MountLoggerConfigMapSuite with DepsTestsSuite with DecommissionSuite with RTestsSuite + with Logging with Eventually with Matchers { import KubernetesSuite._ diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/MountLoggerConfigMapSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/MountLoggerConfigMapSuite.scala new file mode 100644 index 0000000000000..fd756c7941a46 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/MountLoggerConfigMapSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.{BufferedWriter, File, FileWriter} +import java.net.URL + +import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder} +import scala.io.{BufferedSource, Source} + +private[spark] trait MountLoggerConfigMapSuite { k8sSuite: KubernetesSuite => + import KubernetesSuite._ + + private var configMap: ConfigMap = _ + + private def buildConfigMap( + loggingConfUrl: URL, loggingConfigFileName: String, configMapName: String): ConfigMap = { + logInfo(s"Logging configuration is picked up from: $loggingConfigFileName") + val loggerConfStream = loggingConfUrl.openStream() + val loggerConfString = Source.createBufferedSource(loggerConfStream).getLines().mkString("\n") + new ConfigMapBuilder() + .withNewMetadata() + .withName(configMapName) + .endMetadata() + .addToData(loggingConfigFileName, loggerConfString) + .build() + } + + test("Run with logging configuration provided as a k8s config map", k8sTestTag) { + val loggingConfigFileName = "log-config-test-log4j.properties" + val configMapName = "test-logging-config-map" + val loggingConfURL: URL = this.getClass.getClassLoader.getResource(loggingConfigFileName) + assert(loggingConfURL != null, "Logging configuration file not available.") + try { + configMap = buildConfigMap(loggingConfURL, loggingConfigFileName, configMapName) + + kubernetesTestComponents.kubernetesClient.configMaps().createOrReplace(configMap) + + sparkAppConf + .set("spark.kubernetes.loggingConf.configMapName", configMapName) + .set("spark.kubernetes.loggingConf.fileName", loggingConfigFileName) + runSparkApplicationAndVerifyCompletion( + appResource = containerLocalSparkDistroExamplesJar, + mainClass = SPARK_PI_MAIN_CLASS, + expectedLogOnCompletion = (Seq("DEBUG", + s"Using an existing config map ${configMapName}", "Pi is roughly 3")), + appArgs = Array.empty[String], + driverPodChecker = doBasicDriverPodCheck, + executorPodChecker = doBasicExecutorPodCheck, + appLocator = appLocator, + isJVM = true) + } finally { + kubernetesTestComponents.kubernetesClient.configMaps().delete(configMap) + } + } + + test("Run with logging configuration provided from log4j properties file", k8sTestTag) { + val loggingConfigFileName = "log-config-test-log4j.properties" + val loggingConfURL: URL = this.getClass.getClassLoader.getResource(loggingConfigFileName) + assert(loggingConfURL != null, "Logging configuration file not available.") + + val content = Source.createBufferedSource(loggingConfURL.openStream()).getLines().mkString("\n") + val logConfFilePath = s"${sparkHomeDir.toFile}/conf/$loggingConfigFileName" + + try { + val writer = new BufferedWriter(new FileWriter(logConfFilePath)) + writer.write(content) + writer.close() + + sparkAppConf + .set("spark.kubernetes.loggingConf.fileName", loggingConfigFileName) + .set("spark.driver.extraJavaOptions", "-Dlog4j.debug") + + runSparkApplicationAndVerifyCompletion( + appResource = containerLocalSparkDistroExamplesJar, + mainClass = SPARK_PI_MAIN_CLASS, + expectedLogOnCompletion = (Seq("DEBUG", + s"log4j: Reading configuration from URL file:/opt/spark/log/$loggingConfigFileName", + "Pi is roughly 3")), + appArgs = Array.empty[String], + driverPodChecker = doBasicDriverPodCheck, + executorPodChecker = doBasicExecutorPodCheck, + appLocator = appLocator, + isJVM = true) + } finally { + new File(logConfFilePath).delete() + } + } +}