diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 66f3b803c0d47..1d8161282c5b6 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -21,6 +21,7 @@ package org.apache.spark.internal * All structured logging keys should be defined here for standardization. */ object LogKey extends Enumeration { + val ACCUMULATOR_ID = Value val APP_DESC = Value val APP_ID = Value val APP_STATE = Value @@ -33,40 +34,56 @@ object LogKey extends Enumeration { val CLASS_NAME = Value val COMMAND = Value val COMMAND_OUTPUT = Value + val COMPONENT = Value val CONFIG = Value val CONFIG2 = Value val CONTAINER_ID = Value val COUNT = Value val DRIVER_ID = Value + val END_POINT = Value val ERROR = Value + val EVENT_LOOP = Value val EVENT_QUEUE = Value val EXECUTOR_ID = Value - val EXECUTOR_STATE_CHANGED = Value + val EXECUTOR_STATE = Value val EXIT_CODE = Value + val FAILURES = Value val HOST = Value val JOB_ID = Value val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value + val LISTENER = Value + val LOG_TYPE = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value + val MERGE_DIR_NAME = Value + val METHOD_NAME = Value val MIN_SIZE = Value val NUM_ITERATIONS = Value + val OBJECT_ID = Value val OLD_BLOCK_MANAGER_ID = Value val OPTIMIZER_CLASS_NAME = Value val PARTITION_ID = Value val PATH = Value + val PATHS = Value val POD_ID = Value + val PORT = Value val RANGE = Value + val RDD_ID = Value val REASON = Value + val REDUCE_ID = Value val REMOTE_ADDRESS = Value val RETRY_COUNT = Value val RPC_ADDRESS = Value + val SHUFFLE_BLOCK_INFO = Value val SHUFFLE_ID = Value + val SHUFFLE_MERGE_ID = Value val SIZE = Value + val SLEEP_TIME_SECONDS = Value val STAGE_ID = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value @@ -75,8 +92,12 @@ object LogKey extends Enumeration { val TASK_NAME = Value val TASK_SET_NAME = Value val TASK_STATE = Value + val THREAD = Value + val THREAD_NAME = Value val TID = Value val TIMEOUT = Value + val URI = Value + val USER_NAME = Value val WORKER_URL = Value type LogKey = Value diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index a1871cb231cfb..c16a84c13187b 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -24,7 +24,8 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled import scala.jdk.CollectionConverters._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, BROADCAST_ID, LISTENER, RDD_ID, SHUFFLE_ID} import org.apache.spark.internal.config._ import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} import org.apache.spark.scheduler.SparkListener @@ -226,7 +227,7 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.rddCleaned(rddId)) logDebug("Cleaned RDD " + rddId) } catch { - case e: Exception => logError("Error cleaning RDD " + rddId, e) + case e: Exception => logError(log"Error cleaning RDD ${MDC(RDD_ID, rddId)}", e) } } @@ -245,7 +246,7 @@ private[spark] class ContextCleaner( logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)") } } catch { - case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) + case e: Exception => logError(log"Error cleaning shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e) } } @@ -257,7 +258,8 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.broadcastCleaned(broadcastId)) logDebug(s"Cleaned broadcast $broadcastId") } catch { - case e: Exception => logError("Error cleaning broadcast " + broadcastId, e) + case e: Exception => + logError(log"Error cleaning broadcast ${MDC(BROADCAST_ID, broadcastId)}", e) } } @@ -269,7 +271,8 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.accumCleaned(accId)) logDebug("Cleaned accumulator " + accId) } catch { - case e: Exception => logError("Error cleaning accumulator " + accId, e) + case e: Exception => + logError(log"Error cleaning accumulator ${MDC(ACCUMULATOR_ID, accId)}", e) } } @@ -285,7 +288,8 @@ private[spark] class ContextCleaner( logDebug("Cleaned rdd checkpoint data " + rddId) } catch { - case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e) + case e: Exception => + logError(log"Error cleaning rdd checkpoint data ${MDC(RDD_ID, rddId)}", e) } } @@ -295,7 +299,8 @@ private[spark] class ContextCleaner( sc.listenerBus.removeListener(listener) logDebug(s"Cleaned Spark listener $listener") } catch { - case e: Exception => logError(s"Error cleaning Spark listener $listener", e) + case e: Exception => + logError(log"Error cleaning Spark listener ${MDC(LISTENER, listener)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index faa8504df3651..48569eb713793 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -781,7 +781,7 @@ private[spark] class MapOutputTrackerMaster( .getOrElse(Seq.empty[BlockManagerId])) } } catch { - case NonFatal(e) => logError(e.getMessage, e) + case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7595488cecee2..9d908cd8713ce 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -46,7 +46,8 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests._ import org.apache.spark.internal.config.UI._ @@ -748,7 +749,9 @@ class SparkContext(config: SparkConf) extends Logging { } } catch { case e: Exception => - logError(s"Exception getting thread dump from executor $executorId", e) + logError( + log"Exception getting thread dump from executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", + e) None } } @@ -778,7 +781,9 @@ class SparkContext(config: SparkConf) extends Logging { } } catch { case e: Exception => - logError(s"Exception getting heap histogram from executor $executorId", e) + logError( + log"Exception getting heap histogram from " + + log"executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", e) None } } @@ -2140,7 +2145,7 @@ class SparkContext(config: SparkConf) extends Logging { Seq(env.rpcEnv.fileServer.addJar(file)) } catch { case NonFatal(e) => - logError(s"Failed to add $path to Spark environment", e) + logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e) Nil } } @@ -2161,7 +2166,7 @@ class SparkContext(config: SparkConf) extends Logging { Seq(path) } catch { case NonFatal(e) => - logError(s"Failed to add $path to Spark environment", e) + logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e) Nil } } else { diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index a3c36de151554..e433cc10ae731 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.LISTENER import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source @@ -246,7 +247,7 @@ private[spark] class TaskContextImpl( } } listenerExceptions += e - logError(s"Error in $name", e) + logError(log"Error in ${MDC(LISTENER, name)}", e) } } if (listenerExceptions.nonEmpty) { diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 3f7a3ea70a7e7..1a05c8f35b7fb 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -26,7 +26,8 @@ import io.netty.handler.timeout.ReadTimeoutException import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.api.r.SerDe._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{METHOD_NAME, OBJECT_ID} import org.apache.spark.internal.config.R._ import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -76,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend) writeObject(dos, null, server.jvmObjectTracker) } catch { case e: Exception => - logError(s"Removing $objId failed", e) + logError(log"Removing ${MDC(OBJECT_ID, objId)} failed", e) writeInt(dos, -1) writeString(dos, s"Removing $objId failed: ${e.getMessage}") } @@ -192,7 +193,7 @@ private[r] class RBackendHandler(server: RBackend) } } catch { case e: Exception => - logError(s"$methodName on $objId failed", e) + logError(log"${MDC(METHOD_NAME, methodName)} on ${MDC(OBJECT_ID, objId)} failed", e) writeInt(dos, -1) // Writing the error message of the cause for the exception. This will be returned // to user in the R process. diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 1eec3e82f1b79..6cf240f12a1ca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} @@ -61,7 +61,7 @@ private class ClientEndpoint( t => t match { case ie: InterruptedException => // Exit normally case e: Throwable => - logError(e.getMessage, e) + logError(log"${MDC(ERROR, e.getMessage)}", e) System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) }) diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala index 509049550ad4f..d317d6449f293 100644 --- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -27,7 +27,8 @@ import org.json4s.{DefaultFormats, Extraction, Formats} import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.COMPONENT import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -103,9 +104,10 @@ private[spark] object StandaloneResourceUtils extends Logging { writeResourceAllocationJson(allocations, tmpFile) } catch { case NonFatal(e) => - val errMsg = s"Exception threw while preparing resource file for $compShortName" + val errMsg = + log"Exception threw while preparing resource file for ${MDC(COMPONENT, compShortName)}" logError(errMsg, e) - throw new SparkException(errMsg, e) + throw new SparkException(errMsg.message, e) } val resourcesFile = File.createTempFile(s"resource-$compShortName-", ".json", dir) tmpFile.renameTo(resourcesFile) diff --git a/core/src/main/scala/org/apache/spark/deploy/Utils.scala b/core/src/main/scala/org/apache/spark/deploy/Utils.scala index 9eb5a0042e515..4d2546cb808c0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Utils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Utils.scala @@ -22,7 +22,8 @@ import java.io.File import jakarta.servlet.http.HttpServletRequest import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.ui.JettyUtils.createServletHandler import org.apache.spark.ui.WebUI import org.apache.spark.util.Utils.{getFileLength, offsetBytes} @@ -95,7 +96,8 @@ private[deploy] object Utils extends Logging { (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " + + log"directory ${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e0128e35b761a..98cbd7b3eba82 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -37,7 +37,8 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ @@ -920,7 +921,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: AccessControlException => logWarning(s"Insufficient permission while compacting log for $rootPath", e) case e: Exception => - logError(s"Exception while compacting log for $rootPath", e) + logError(log"Exception while compacting log for ${MDC(PATH, rootPath)}", e) } finally { endProcessing(rootPath) } @@ -1402,7 +1403,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case _: AccessControlException => logInfo(s"No permission to delete $log, ignoring.") case ioe: IOException => - logError(s"IOException in cleaning $log", ioe) + logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe) } } deleted diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index e2ba221fb00cd..0659c26fd15b6 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, EXECUTOR_STATE_CHANGED, MASTER_URL, MAX_ATTEMPTS} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ @@ -549,7 +549,7 @@ private[deploy] class Worker( }(cleanupThreadExecutor) cleanupFuture.failed.foreach(e => - logError("App dir cleanup failed: " + e.getMessage, e) + logError(log"App dir cleanup failed: ${MDC(ERROR, e.getMessage)}", e) )(cleanupThreadExecutor) } catch { case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown => @@ -638,7 +638,9 @@ private[deploy] class Worker( addResourcesUsed(resources_) } catch { case e: Exception => - logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) + logError( + log"Failed to launch executor ${MDC(APP_ID, appId)}/${MDC(EXECUTOR_ID, execId)} " + + log"for ${MDC(APP_DESC, appDesc.name)}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId @@ -749,7 +751,7 @@ private[deploy] class Worker( Utils.deleteRecursively(new File(dir)) } }(cleanupThreadExecutor).failed.foreach(e => - logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e) + logError(log"Clean up app dir ${MDC(PATHS, dirList)} failed", e) )(cleanupThreadExecutor) } } catch { @@ -794,8 +796,10 @@ private[deploy] class Worker( case Failure(t) => val failures = executorStateSyncFailureAttempts.getOrElse(fullId, 0) + 1 if (failures < executorStateSyncMaxAttempts) { - logError(s"Failed to send $newState to Master $masterRef, " + - s"will retry ($failures/$executorStateSyncMaxAttempts).", t) + logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)}" + + log" to Master ${MDC(MASTER_URL, masterRef)}, will retry " + + log"(${MDC(FAILURES, failures)}/" + + log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)}).", t) executorStateSyncFailureAttempts(fullId) = failures // If the failure is not caused by TimeoutException, wait for a while before retry in // case the connection is temporarily unavailable. @@ -808,7 +812,7 @@ private[deploy] class Worker( } self.send(newState) } else { - logError(log"Failed to send ${MDC(EXECUTOR_STATE_CHANGED, newState)} " + + logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)} " + log"to Master ${MDC(MASTER_URL, masterRef)} for " + log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)} times. Giving up.") System.exit(1) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index c4a15095ec40c..006a388e98b5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -23,7 +23,8 @@ import scala.xml.{Node, Unparsed} import jakarta.servlet.http.HttpServletRequest -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils import org.apache.spark.util.logging.RollingFileAppender @@ -174,7 +175,8 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " + + log"directory ${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 67d0c37c3eddf..a7657cd78cd9b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -41,7 +41,7 @@ import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC => LogMDC} -import org.apache.spark.internal.LogKey.{ERROR, MAX_ATTEMPTS, TASK_ID, TASK_NAME, TIMEOUT} +import org.apache.spark.internal.LogKey.{CLASS_NAME, ERROR, MAX_ATTEMPTS, TASK_ID, TASK_NAME, TIMEOUT} import org.apache.spark.internal.config._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} @@ -661,9 +661,10 @@ private[spark] class Executor( // uh-oh. it appears the user code has caught the fetch-failure without throwing any // other exceptions. Its *possible* this is what the user meant to do (though highly // unlikely). So we will log an error and keep going. - logError(s"$taskName completed successfully though internally it encountered " + - s"unrecoverable fetch failures! Most likely this means user code is incorrectly " + - s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure) + logError(log"${LogMDC(TASK_NAME, taskName)} completed successfully though internally " + + log"it encountered unrecoverable fetch failures! Most likely this means user code " + + log"is incorrectly swallowing Spark's internal " + + log"${LogMDC(CLASS_NAME, classOf[FetchFailedException])}", fetchFailure) } val taskFinishNs = System.nanoTime() val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { @@ -802,7 +803,7 @@ private[spark] class Executor( // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. - logError(s"Exception in $taskName", t) + logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}", t) // SPARK-20904: Do not report failure to driver if if happened during shut down. Because // libraries may set up shutdown hooks that race with running tasks during shutdown, diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala index b9f4486b66fa6..c7047ddd278b2 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala @@ -30,7 +30,7 @@ import org.apache.xbean.asm9.Opcodes._ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.util.ParentClassLoader /** @@ -183,7 +183,8 @@ class ExecutorClassLoader( None case e: Exception => // Something bad happened while checking if the class exists - logError(s"Failed to check existence of class $name on REPL class server at $uri", e) + logError(log"Failed to check existence of class ${MDC(LogKey.CLASS_NAME, name)} " + + log"on REPL class server at ${MDC(LogKey.URI, uri)}", e) if (userClassPathFirst) { // Allow to try to load from "parentLoader" None diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 20239980eee5c..95ea814042d35 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID +import org.apache.spark.internal.LogKey.{JOB_ID, TASK_ATTEMPT_ID} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} @@ -104,7 +104,7 @@ object SparkHadoopWriter extends Logging { logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.") } catch { case cause: Throwable => - logError(s"Aborting job ${jobContext.getJobID}.", cause) + logError(log"Aborting job ${MDC(JOB_ID, jobContext.getJobID)}.", cause) committer.abortJob(jobContext) throw new SparkException("Job aborted.", cause) } diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index b059e82df23b5..c68999f34079d 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -24,7 +24,8 @@ import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter} import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.executor.CommitDeniedException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID import org.apache.spark.util.Utils object SparkHadoopMapRedUtil extends Logging { @@ -52,7 +53,9 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(s"$mrTaskAttemptID: Committed. Elapsed time: $timeCost ms.") } catch { case cause: IOException => - logError(s"Error committing the output of task: $mrTaskAttemptID", cause) + logError( + log"Error committing the output of task: ${MDC(TASK_ATTEMPT_ID, mrTaskAttemptID)}", + cause) committer.abortTask(mrTaskContext) throw cause } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 195c5b0f47f57..12df40c3476a0 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import scala.util.matching.Regex import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.METRICS_CONF import org.apache.spark.util.Utils @@ -140,7 +141,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } catch { case e: Exception => val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME) - logError(s"Error loading configuration file $file", e) + logError(log"Error loading configuration file ${MDC(PATH, file)}", e) } finally { if (is != null) { is.close() diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 05e0b6b9c4ef0..555083bb65d24 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -188,7 +188,8 @@ private[spark] class MetricsSystem private ( val source = Utils.classForName[Source](classPath).getConstructor().newInstance() registerSource(source) } catch { - case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) + case e: Exception => + logError(log"Source class ${MDC(CLASS_NAME, classPath)} cannot be instantiated", e) } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 086df62313249..127bdf6d91812 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -34,7 +34,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.LogKey.{COMMAND, ERROR} +import org.apache.spark.internal.LogKey.{COMMAND, ERROR, PATH} import org.apache.spark.internal.MDC import org.apache.spark.util.Utils @@ -107,8 +107,9 @@ private[spark] class PipedRDD[T: ClassTag]( pb.directory(taskDirFile) workInTaskDirectory = true } catch { - case e: Exception => logError("Unable to setup task working directory: " + e.getMessage + - " (" + taskDirectory + ")", e) + case e: Exception => + logError(log"Unable to setup task working directory: ${MDC(ERROR, e.getMessage)}" + + log" (${MDC(PATH, taskDirectory)})", e) } } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala index 472401b23fe8e..b503c5a0f8089 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala @@ -22,7 +22,8 @@ import javax.annotation.concurrent.GuardedBy import scala.util.control.NonFatal import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.END_POINT import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint} @@ -206,7 +207,8 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint) // Should reduce the number of active threads before throw the error. numActiveThreads -= 1 } - logError(s"An error happened while processing message in the inbox for $endpointName", fatal) + logError(log"An error happened while processing message in the inbox for" + + log" ${MDC(END_POINT, endpointName)}", fatal) throw fatal } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala index df7cd0b44c900..2d94ed5d05e1c 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala @@ -22,7 +22,8 @@ import java.util.concurrent._ import scala.util.control.NonFatal import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ERROR import org.apache.spark.internal.config.EXECUTOR_ID import org.apache.spark.internal.config.Network._ import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcEndpoint} @@ -74,7 +75,7 @@ private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Loggin } inbox.process(dispatcher) } catch { - case NonFatal(e) => logError(e.getMessage, e) + case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7ee8dc7ec0c8a..41cbd795b7e5e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -37,7 +37,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{JOB_ID, STAGE_ID} +import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, CLASS_NAME, JOB_ID, PARTITION_ID, STAGE_ID, TASK_ID} import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED} import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.network.shuffle.{BlockStoreClient, MergeFinalizerListener} @@ -1749,8 +1749,8 @@ private[spark] class DAGScheduler( case None => "Unknown class" } logError( - s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}", - e) + log"Failed to update accumulator ${MDC(ACCUMULATOR_ID, id)} (${MDC(CLASS_NAME, accumClassName)}) " + + log"for task ${MDC(PARTITION_ID, task.partitionId)}", e) } } } @@ -1763,7 +1763,9 @@ private[spark] class DAGScheduler( } catch { case NonFatal(e) => val taskId = event.taskInfo.taskId - logError(s"Error when attempting to reconstruct metrics for task $taskId", e) + logError( + log"Error when attempting to reconstruct metrics for task ${MDC(TASK_ID, taskId)}", + e) null } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index a3b8f1206b9d4..24c25d2377948 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{LINE, LINE_NUM} +import org.apache.spark.internal.LogKey.{LINE, LINE_NUM, PATH} import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.util.JsonProtocol @@ -125,7 +125,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case ioe: IOException => throw ioe case e: Exception => - logError(s"Exception parsing Spark event log: $sourceName", e) + logError(log"Exception parsing Spark event log: ${MDC(PATH, sourceName)}", e) logError(log"Malformed line #${MDC(LINE_NUM, lineNumber)}: ${MDC(LINE, currentLine)}\n") false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index a30744da9ee98..7e61dad3c141b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -26,7 +26,8 @@ import scala.xml.{Node, XML} import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -99,10 +100,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) } } catch { case NonFatal(t) => - val defaultMessage = "Error while building the fair scheduler pools" - val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" } - .getOrElse(defaultMessage) - logError(message, t) + if (fileData.isDefined) { + val fileName = fileData.get._2 + logError(log"Error while building the fair scheduler pools from ${MDC(PATH, fileName)}", + t) + } else { + logError("Error while building the fair scheduler pools", t) + } throw t } finally { fileData.foreach { case (is, fileName) => is.close() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index dc06567784558..1418901e3442c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -534,9 +534,9 @@ private[spark] class TaskSetManager( // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." + val msg = log"Failed to serialize task ${MDC(TASK_ID, taskId)}, not attempting to retry it." logError(msg, e) - abort(s"$msg Exception during serialization: $e") + abort(s"${msg.message} Exception during serialization: $e") throw SparkCoreErrors.failToSerializeTaskError(e) } if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024 && diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 613080813d8e4..5a0b2ba3735c5 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -39,7 +39,8 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.Kryo._ import org.apache.spark.internal.io.FileCommitProtocol._ import org.apache.spark.network.util.ByteUnit @@ -739,7 +740,7 @@ private object JavaIterableWrapperSerializer extends Logging { private val underlyingMethodOpt = { try Some(wrapperClass.getDeclaredMethod("underlying")) catch { case e: Exception => - logError("Failed to find the underlying field in " + wrapperClass, e) + logError(log"Failed to find the underlying field in ${MDC(CLASS_NAME, wrapperClass)}", e) None } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index d50b8f935d561..109a9a2e3eb94 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.HashMap import scala.jdk.CollectionConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR import org.apache.spark.status.AppStatusUtils.getQuantilesValue import org.apache.spark.status.api.v1 @@ -864,7 +865,7 @@ private[spark] object AppStatusStore extends Logging { Some(localDir) } catch { case e: IOException => - logError(s"Failed to create spark ui store path in $rootDir.", e) + logError(log"Failed to create spark ui store path in ${MDC(PATH, rootDir)}.", e) None } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e68239f260d9d..9aa100d9ff36e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -42,7 +42,7 @@ import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.BLOCK_ID +import org.apache.spark.internal.LogKey.{BLOCK_ID, COUNT, SLEEP_TIME_SECONDS} import org.apache.spark.internal.config.{Network, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source @@ -626,8 +626,9 @@ private[spark] class BlockManager( return } catch { case e: Exception if i < MAX_ATTEMPTS => - logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" - + s" more times after waiting $SLEEP_TIME_SECS seconds...", e) + logError(log"Failed to connect to external shuffle server, will retry " + + log"${MDC(COUNT, MAX_ATTEMPTS - i)} more times after waiting " + + log"${MDC(SLEEP_TIME_SECONDS, SLEEP_TIME_SECS)} seconds...", e) Thread.sleep(SLEEP_TIME_SECS * 1000L) case NonFatal(e) => throw SparkCoreErrors.unableToRegisterWithExternalShuffleServerError(e) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 686003e2c51dc..5b4ecef233f8f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -26,8 +26,8 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.SHUFFLE_BLOCK_INFO import org.apache.spark.shuffle.ShuffleBlockInfo import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.util.{ThreadUtils, Utils} @@ -152,11 +152,13 @@ private[storage] class BlockManagerDecommissioner( isTargetDecommissioned = true keepRunning = false } else { - logError(s"Error occurred during migrating $shuffleBlockInfo", e) + logError(log"Error occurred during migrating " + + log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e) keepRunning = false } case e: Exception => - logError(s"Error occurred during migrating $shuffleBlockInfo", e) + logError(log"Error occurred during migrating " + + log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e) keepRunning = false } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index ac453d0f743c3..5bb4e096c029c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -32,7 +32,7 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, OLD_BLOCK_MANAGER_ID} +import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, EXECUTOR_ID, OLD_BLOCK_MANAGER_ID} import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} @@ -328,7 +328,8 @@ class BlockManagerMasterEndpoint( // care about the return result of removing blocks. That way we avoid breaking // down the whole application. case NonFatal(e) => - logError(s"Cannot determine whether executor $executorId is alive or not.", e) + logError(log"Cannot determine whether executor " + + log"${MDC(EXECUTOR_ID, executorId)} is alive or not.", e) false } if (!isAlive) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 5cc08714d41c1..1fccbd16ced5b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -20,7 +20,8 @@ package org.apache.spark.storage import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import org.apache.spark.{MapOutputTracker, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{BLOCK_ID, BROADCAST_ID, RDD_ID, SHUFFLE_ID} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{ThreadUtils, Utils} @@ -44,18 +45,18 @@ class BlockManagerStorageEndpoint( // Operations that involve removing blocks may be slow and should be done asynchronously override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RemoveBlock(blockId) => - doAsync[Boolean]("removing block " + blockId, context) { + doAsync[Boolean](log"removing block ${MDC(BLOCK_ID, blockId)}", context) { blockManager.removeBlock(blockId) true } case RemoveRdd(rddId) => - doAsync[Int]("removing RDD " + rddId, context) { + doAsync[Int](log"removing RDD ${MDC(RDD_ID, rddId)}", context) { blockManager.removeRdd(rddId) } case RemoveShuffle(shuffleId) => - doAsync[Boolean]("removing shuffle " + shuffleId, context) { + doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) { if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } @@ -66,7 +67,7 @@ class BlockManagerStorageEndpoint( context.reply(blockManager.decommissionSelf()) case RemoveBroadcast(broadcastId, _) => - doAsync[Int]("removing broadcast " + broadcastId, context) { + doAsync[Int](log"removing broadcast ${MDC(BROADCAST_ID, broadcastId)}", context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) } @@ -96,18 +97,20 @@ class BlockManagerStorageEndpoint( context.reply(blockManager.blockInfoManager.tryMarkBlockAsVisible(blockId)) } - private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T): Unit = { + private def doAsync[T]( + actionMessage: MessageWithContext, + context: RpcCallContext)(body: => T): Unit = { val future = Future { - logDebug(actionMessage) + logDebug(actionMessage.message) body } future.foreach { response => - logDebug(s"Done $actionMessage, response is $response") + logDebug(s"Done ${actionMessage.message}, response is $response") context.reply(response) logDebug(s"Sent response: $response to ${context.senderAddress}") } future.failed.foreach { t => - logError(s"Error in $actionMessage", t) + logError(log"Error in " + actionMessage, t) context.sendFailure(t) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 7446a55fc7c37..4c0b5f4a14f64 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -30,7 +30,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{MERGE_DIR_NAME, PATH} import org.apache.spark.network.shuffle.ExecutorDiskUtils import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY @@ -255,7 +256,8 @@ private[spark] class DiskBlockManager( Some(localDir) } catch { case e: IOException => - logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) + logError( + log"Failed to create local dir in ${MDC(PATH, rootDir)}. Ignoring this directory.", e) None } } @@ -292,7 +294,8 @@ private[spark] class DiskBlockManager( } catch { case e: IOException => logError( - s"Failed to create $mergeDirName dir in $rootDir. Ignoring this directory.", e) + log"Failed to create ${MDC(MERGE_DIR_NAME, mergeDirName)} dir in " + + log"${MDC(PATH, rootDir)}. Ignoring this directory.", e) } } } @@ -370,7 +373,7 @@ private[spark] class DiskBlockManager( } } catch { case e: Exception => - logError(s"Exception while deleting local spark dir: $localDir", e) + logError(log"Exception while deleting local spark dir: ${MDC(PATH, localDir)}", e) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 80e268081fa7f..0b6e33ff5fb37 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -271,7 +271,8 @@ private[spark] class DiskBlockObjectWriter( logError(log"Exception occurred while reverting partial writes to file " + log"${MDC(PATH, file)}, ${MDC(ERROR, ce.getMessage)}") case e: Exception => - logError("Uncaught exception while reverting partial writes to file " + file, e) + logError( + log"Uncaught exception while reverting partial writes to file ${MDC(PATH, file)}", e) } finally { if (truncateStream != null) { truncateStream.close() diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala index 9b6048e90c9a6..31958af84e54b 100644 --- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala +++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala @@ -28,7 +28,8 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark.MapOutputTracker import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HOST, PORT, REDUCE_ID, SHUFFLE_ID, SHUFFLE_MERGE_ID} import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} import org.apache.spark.shuffle.ShuffleReadMetricsReporter import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER @@ -170,9 +171,10 @@ private class PushBasedFetchHelper( reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address)) } catch { case exception: Exception => - logError(s"Failed to parse the meta of push-merged block for ($shuffleId, " + - s"$shuffleMergeId, $reduceId) from" + - s" ${req.address.host}:${req.address.port}", exception) + logError(log"Failed to parse the meta of push-merged block for (" + + log"${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)}, " + + log"${MDC(REDUCE_ID, reduceId)}) from ${MDC(HOST, req.address.host)}" + + log":${MDC(PORT, req.address.port)}", exception) iterator.addToResultsQueue( PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId, address)) @@ -181,8 +183,9 @@ private class PushBasedFetchHelper( override def onFailure(shuffleId: Int, shuffleMergeId: Int, reduceId: Int, exception: Throwable): Unit = { - logError(s"Failed to get the meta of push-merged block for ($shuffleId, $reduceId) " + - s"from ${req.address.host}:${req.address.port}", exception) + logError(log"Failed to get the meta of push-merged block for " + + log"(${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(REDUCE_ID, reduceId)}) " + + log"from ${MDC(HOST, req.address.host)}:${MDC(PORT, req.address.port)}", exception) iterator.addToResultsQueue( PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId, address)) } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 916cb83d379e0..d22ce3dbed772 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -37,7 +37,7 @@ import org.apache.spark.{MapOutputTracker, SparkException, TaskContext} import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, MAX_ATTEMPTS} +import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, HOST, MAX_ATTEMPTS, PORT} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} @@ -314,7 +314,8 @@ final class ShuffleBlockFetcherIterator( override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { ShuffleBlockFetcherIterator.this.synchronized { - logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) + logError(log"Failed to get block(s) from " + + log"${MDC(HOST, req.address.host)}:${MDC(PORT, req.address.port)}", e) e match { // SPARK-27991: Catch the Netty OOM and set the flag `isNettyOOMOnShuffle` (shared among // tasks) to true as early as possible. The pending fetch requests won't be sent diff --git a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala index 83a8fd628cd78..3102115159994 100644 --- a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala @@ -21,7 +21,8 @@ import scala.xml.{Node, Unparsed} import jakarta.servlet.http.HttpServletRequest import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.util.Utils import org.apache.spark.util.logging.DriverLogger.DRIVER_LOG_FILE @@ -136,7 +137,8 @@ private[ui] class DriverLogPage( (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from directory " + + log"${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 099e47abf408a..ddf451c16f3a2 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -23,7 +23,8 @@ import jakarta.servlet.http.{HttpServlet, HttpServletRequest, HttpServletRespons import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler._ @@ -155,7 +156,7 @@ private[spark] class SparkUI private ( serverInfo = Some(server) } catch { case e: Exception => - logError(s"Failed to bind $className", e) + logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e) System.exit(1) } } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 2c937e71f64b9..baeed322e8ad3 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -29,7 +29,8 @@ import org.eclipse.jetty.servlet.{FilterHolder, FilterMapping, ServletContextHan import org.json4s.JsonAST.{JNothing, JValue} import org.apache.spark.{SecurityManager, SparkConf, SSLOptions} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config._ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -158,7 +159,7 @@ private[spark] abstract class WebUI( logInfo(s"Bound $className to $hostName, and started at $webUrl") } catch { case e: Exception => - logError(s"Failed to bind $className", e) + logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e) System.exit(1) } } diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index 5125adc9f7ca6..eaa9ef517294e 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.control.NonFatal -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EVENT_LOOP /** * An event loop to receive events from the caller and process all events in the event thread. It @@ -52,13 +53,13 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { try { onError(e) } catch { - case NonFatal(e) => logError("Unexpected error in " + name, e) + case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty - case NonFatal(e) => logError("Unexpected error in " + name, e) + case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index f1daa76f3116c..814201d8c959c 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -26,7 +26,8 @@ import scala.util.control.NonFatal import com.codahale.metrics.Timer import org.apache.spark.SparkEnv -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.LISTENER import org.apache.spark.scheduler.EventLoggingListener import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate @@ -122,10 +123,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { } } catch { case ie: InterruptedException => - logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie) + logError(log"Interrupted while posting to " + + log"${MDC(LISTENER, listenerName)}. Removing that listener.", ie) removeListenerOnError(listener) case NonFatal(e) if !isIgnorableException(e) => - logError(s"Listener ${listenerName} threw an exception", e) + logError(log"Listener ${MDC(LISTENER, listenerName)} threw an exception", e) } finally { if (maybeTimerContext != null) { val elapsed = maybeTimerContext.stop() diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index c6cad94401689..b9dece19f2651 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -26,7 +26,8 @@ import scala.util.Try import org.apache.hadoop.fs.FileSystem import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS @@ -68,7 +69,8 @@ private[spark] object ShutdownHookManager extends Logging { logInfo("Deleting directory " + dirPath) Utils.deleteRecursively(new File(dirPath)) } catch { - case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) + case e: Exception => + logError(log"Exception while deleting Spark temp dir: ${MDC(PATH, dirPath)}", e) } } } diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index b24129eb36971..74f1474f9cf78 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -17,7 +17,8 @@ package org.apache.spark.util -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.THREAD /** * The default uncaught exception handler for Spark daemons. It terminates the whole process for @@ -36,11 +37,14 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: override def uncaughtException(thread: Thread, exception: Throwable): Unit = { try { + val mdc = MDC(THREAD, thread) // Make it explicit that uncaught exceptions are thrown when container is shutting down. // It will help users when they analyze the executor logs - val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else "" - val errMsg = "Uncaught exception in thread " - logError(inShutdownMsg + errMsg + thread, exception) + if (ShutdownHookManager.inShutdown()) { + logError(log"[Container in shutdown] Uncaught exception in thread $mdc", exception) + } else { + logError(log"Uncaught exception in thread $mdc", exception) + } // We may have been called from a shutdown hook. If so, we must not call System.exit(). // (If we do, we will deadlock.) @@ -61,7 +65,9 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: } catch { case oom: OutOfMemoryError => try { - logError(s"Uncaught OutOfMemoryError in thread $thread, process halted.", oom) + logError( + log"Uncaught OutOfMemoryError in thread ${MDC(THREAD, thread)}, process halted.", + oom) } catch { // absorb any exception/error since we're halting the process case _: Throwable => @@ -69,7 +75,9 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Runtime.getRuntime.halt(SparkExitCode.OOM) case t: Throwable => try { - logError(s"Another uncaught exception in thread $thread, process halted.", t) + logError( + log"Another uncaught exception in thread ${MDC(THREAD, thread)}, process halted.", + t) } catch { case _: Throwable => } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d7e174f5497c4..7022506e5508e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -69,7 +69,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{COMMAND, COMMAND_OUTPUT, EXIT_CODE, PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Streaming._ import org.apache.spark.internal.config.Tests.IS_TESTING @@ -1276,11 +1276,13 @@ private[spark] object Utils case t: Throwable => val currentThreadName = Thread.currentThread().getName if (sc != null) { - logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) + logError(log"uncaught error in thread ${MDC(THREAD_NAME, currentThreadName)}, " + + log"stopping SparkContext", t) sc.stopInNewThread() } if (!NonFatal(t)) { - logError(s"throw uncaught fatal error in thread $currentThreadName", t) + logError( + log"throw uncaught fatal error in thread ${MDC(THREAD_NAME, currentThreadName)}", t) throw t } } @@ -1292,7 +1294,8 @@ private[spark] object Utils block } catch { case NonFatal(t) => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) } } @@ -1469,7 +1472,7 @@ private[spark] object Utils fileSize } catch { case e: Throwable => - logError(s"Cannot get file length of ${file}", e) + logError(log"Cannot get file length of ${MDC(PATH, file)}", e) throw e } finally { if (gzInputStream != null) { @@ -1847,7 +1850,8 @@ private[spark] object Utils case ct: ControlThrowable => throw ct case t: Throwable => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) throw t } } @@ -1861,7 +1865,8 @@ private[spark] object Utils case ct: ControlThrowable => throw ct case t: Throwable => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) scala.util.Failure(t) } } @@ -2348,7 +2353,8 @@ private[spark] object Utils val currentUserGroups = groupMappingServiceProvider.getGroups(username) return currentUserGroups } catch { - case e: Exception => logError(s"Error getting groups for user=$username", e) + case e: Exception => + logError(log"Error getting groups for user=${MDC(USER_NAME, username)}", e) } } EMPTY_USER_GROUPS diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 2243239dce6fd..1dadf15da40fa 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -20,7 +20,8 @@ package org.apache.spark.util.logging import java.io.{File, FileOutputStream, InputStream, IOException} import org.apache.spark.SparkConf -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.util.{IntParam, Utils} /** @@ -90,7 +91,7 @@ private[spark] class FileAppender( } } catch { case e: Exception => - logError(s"Error writing stream to file $file", e) + logError(log"Error writing stream to file ${MDC(PATH, file)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index e374c41b91405..f8f144f6e3885 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -24,7 +24,8 @@ import com.google.common.io.Files import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.util.ArrayImplicits._ /** @@ -77,7 +78,7 @@ private[spark] class RollingFileAppender( } } catch { case e: Exception => - logError(s"Error rolling over $activeFile", e) + logError(log"Error rolling over ${MDC(PATH, activeFile)}", e) } } @@ -156,7 +157,8 @@ private[spark] class RollingFileAppender( } } catch { case e: Exception => - logError("Error cleaning logs in directory " + activeFile.getParentFile.getAbsolutePath, e) + val path = activeFile.getParentFile.getAbsolutePath + logError(log"Error cleaning logs in directory ${MDC(PATH, path)}", e) } } }