From 89afe18c5c8c1ba2a470b0ae9dc9ac8df48eb707 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 2 Apr 2024 15:35:30 +0800 Subject: [PATCH 1/5] [SPARK-47688][CORE] Support concatenation `normal variables` and `MDC` --- .../org/apache/spark/internal/Logging.scala | 16 ++++++---- .../spark/util/PatternLoggingSuite.scala | 4 +++ .../spark/util/StructuredLoggingSuite.scala | 31 +++++++++++++++++++ 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 5765a6eed5420..7d73e39610639 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -96,17 +96,21 @@ trait Logging { } implicit class LogStringContext(val sc: StringContext) { - def log(args: MDC*): MessageWithContext = { + def log(args: Any*): MessageWithContext = { val processedParts = sc.parts.iterator val sb = new StringBuilder(processedParts.next()) val context = new java.util.HashMap[String, String]() - args.foreach { mdc => - sb.append(mdc.value) - if (Logging.isStructuredLoggingEnabled) { - context.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value) + args.foreach { arg => + arg match { + case MDC(k, v) => + sb.append(v) + if (Logging.isStructuredLoggingEnabled) { + context.put(k.toString.toLowerCase(Locale.ROOT), v) + } + case v: Any => + sb.append(v.toString) } - if (processedParts.hasNext) { sb.append(processedParts.next()) } diff --git a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala index 02895f708ff06..ede6a21c1d7ea 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala @@ -45,4 +45,8 @@ class PatternLoggingSuite extends LoggingSuiteBase with BeforeAndAfterAll { s""".*$level $className: Min Size: 2, Max Size: 4. Please double check.\n""" assert(pattern.r.matches(logOutput)) } + + override def expectedPatternForConcatVariableAndMDC(level: Level): String = { + s""".*$level $className: Hello This is a log message, Lost executor 1.\n""" + } } diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index fe42c7fec990e..cebb2a119b29f 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -63,12 +63,17 @@ trait LoggingSuiteBase log"Max Size: ${MDC(MAX_SIZE, "4")}. " + log"Please double check." + def concatVariableAndMDC: LogEntry = + log"Hello $basicMsg, Lost executor ${MDC(EXECUTOR_ID, "1")}." + def expectedPatternForMsgWithMDC(level: Level): String def expectedPatternForMsgWithMDCAndException(level: Level): String def verifyMsgWithConcat(level: Level, logOutput: String): Unit + def expectedPatternForConcatVariableAndMDC(level: Level): String + test("Basic logging") { Seq( (Level.ERROR, () => logError(basicMsg)), @@ -112,6 +117,18 @@ trait LoggingSuiteBase verifyMsgWithConcat(level, logOutput) } } + + test("Logging concat variable and MDC") { + Seq( + (Level.ERROR, () => logError(concatVariableAndMDC)), + (Level.WARN, () => logWarning(concatVariableAndMDC)), + (Level.INFO, () => logInfo(concatVariableAndMDC))).foreach { + case (level, logFunc) => + val logOutput = captureLogOutput(logFunc) + println(logOutput) + assert(expectedPatternForConcatVariableAndMDC(level).r.matches(logOutput)) + } + } } class StructuredLoggingSuite extends LoggingSuiteBase { @@ -170,6 +187,20 @@ class StructuredLoggingSuite extends LoggingSuiteBase { }""") } + override def expectedPatternForConcatVariableAndMDC(level: Level): String = { + compactAndToRegexPattern( + s""" + { + "ts": "", + "level": "$level", + "msg": "Hello This is a log message, Lost executor 1.", + "context": { + "executor_id": "1" + }, + "logger": "$className" + }""") + } + override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = { val pattern1 = compactAndToRegexPattern( s""" From 834fdfcde8a48067bec0c7fec7ddc3343a23eb71 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 2 Apr 2024 15:39:26 +0800 Subject: [PATCH 2/5] fix it --- .../scala/org/apache/spark/util/StructuredLoggingSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index cebb2a119b29f..abd6b6fe8ce65 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -125,7 +125,6 @@ trait LoggingSuiteBase (Level.INFO, () => logInfo(concatVariableAndMDC))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) - println(logOutput) assert(expectedPatternForConcatVariableAndMDC(level).r.matches(logOutput)) } } From 64bcd8e9dcbf6f1ba3a0f9a75481d54711205c76 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 2 Apr 2024 19:26:43 +0800 Subject: [PATCH 3/5] add new syntax --- .../org/apache/spark/internal/Logging.scala | 13 ++++ .../spark/util/PatternLoggingSuite.scala | 16 +++- .../spark/util/StructuredLoggingSuite.scala | 77 +++++++++++++++++-- 3 files changed, 94 insertions(+), 12 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 7d73e39610639..059e87d4425d3 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -48,6 +48,19 @@ case class MessageWithContext(message: String, context: java.util.HashMap[String resultMap.putAll(mdc.context) MessageWithContext(message + mdc.message, resultMap) } + + def ++(s: String): MessageWithContext = { + MessageWithContext(message + s, context) + } +} + +object MDC { + + implicit class StringImprovements(val s: String) { + def ++(mdc: MessageWithContext): MessageWithContext = { + MessageWithContext(s + mdc.message, mdc.context) + } + } } /** diff --git a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala index ede6a21c1d7ea..83edca53c8d18 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/PatternLoggingSuite.scala @@ -40,13 +40,21 @@ class PatternLoggingSuite extends LoggingSuiteBase with BeforeAndAfterAll { override def expectedPatternForMsgWithMDCAndException(level: Level): String = s""".*$level $className: Error in executor 1.\njava.lang.RuntimeException: OOM\n.*""" + override def expectedPatternForVariableAndMDC(level: Level): String = { + s""".*$level $className: Hello This is a log message, Lost executor 1.\n""" + } + + override def expectedPatternForConcatStringAndMDC(level: Level): String = { + s""".*$level $className: Hello This is a log message, Lost executor 1.\n""" + } + + override def expectedPatternForConcatMDCAndString(level: Level): String = { + s""".*$level $className: Lost executor 1. Hello This is a log message.\n""" + } + override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = { val pattern = s""".*$level $className: Min Size: 2, Max Size: 4. Please double check.\n""" assert(pattern.r.matches(logOutput)) } - - override def expectedPatternForConcatVariableAndMDC(level: Level): String = { - s""".*$level $className: Hello This is a log message, Lost executor 1.\n""" - } } diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index abd6b6fe8ce65..adb197ff5d125 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite import org.apache.spark.internal.{LogEntry, Logging, MDC} import org.apache.spark.internal.LogKey.{EXECUTOR_ID, MAX_SIZE, MIN_SIZE} +import org.apache.spark.internal.MDC._ trait LoggingSuiteBase extends AnyFunSuite // scalastyle:ignore funsuite @@ -63,16 +64,26 @@ trait LoggingSuiteBase log"Max Size: ${MDC(MAX_SIZE, "4")}. " + log"Please double check." - def concatVariableAndMDC: LogEntry = + def logVariableAndMDC: LogEntry = log"Hello $basicMsg, Lost executor ${MDC(EXECUTOR_ID, "1")}." + def concatMDCAndString: LogEntry = + (log"Lost executor ${MDC(EXECUTOR_ID, "1")}." ++ s" Hello $basicMsg.") + + def concatStringAndMDC: LogEntry = + (s"Hello $basicMsg, " ++ (log"Lost executor ${MDC(EXECUTOR_ID, "1")}.")) + def expectedPatternForMsgWithMDC(level: Level): String def expectedPatternForMsgWithMDCAndException(level: Level): String def verifyMsgWithConcat(level: Level, logOutput: String): Unit - def expectedPatternForConcatVariableAndMDC(level: Level): String + def expectedPatternForVariableAndMDC(level: Level): String + + def expectedPatternForConcatStringAndMDC(level: Level): String + + def expectedPatternForConcatMDCAndString(level: Level): String test("Basic logging") { Seq( @@ -118,14 +129,36 @@ trait LoggingSuiteBase } } - test("Logging concat variable and MDC") { + test("Logging variable and MDC") { Seq( - (Level.ERROR, () => logError(concatVariableAndMDC)), - (Level.WARN, () => logWarning(concatVariableAndMDC)), - (Level.INFO, () => logInfo(concatVariableAndMDC))).foreach { + (Level.ERROR, () => logError(logVariableAndMDC)), + (Level.WARN, () => logWarning(logVariableAndMDC)), + (Level.INFO, () => logInfo(logVariableAndMDC))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) - assert(expectedPatternForConcatVariableAndMDC(level).r.matches(logOutput)) + assert(expectedPatternForVariableAndMDC(level).r.matches(logOutput)) + } + } + + test("Logging concat string and MDC") { + Seq( + (Level.ERROR, () => logError(concatStringAndMDC)), + (Level.WARN, () => logWarning(concatStringAndMDC)), + (Level.INFO, () => logInfo(concatStringAndMDC))).foreach { + case (level, logFunc) => + val logOutput = captureLogOutput(logFunc) + assert(expectedPatternForConcatStringAndMDC(level).r.matches(logOutput)) + } + } + + test("Logging concat MDC and string") { + Seq( + (Level.ERROR, () => logError(concatMDCAndString)), + (Level.WARN, () => logWarning(concatMDCAndString)), + (Level.INFO, () => logInfo(concatMDCAndString))).foreach { + case (level, logFunc) => + val logOutput = captureLogOutput(logFunc) + assert(expectedPatternForConcatMDCAndString(level).r.matches(logOutput)) } } } @@ -186,7 +219,7 @@ class StructuredLoggingSuite extends LoggingSuiteBase { }""") } - override def expectedPatternForConcatVariableAndMDC(level: Level): String = { + override def expectedPatternForVariableAndMDC(level: Level): String = { compactAndToRegexPattern( s""" { @@ -200,6 +233,34 @@ class StructuredLoggingSuite extends LoggingSuiteBase { }""") } + override def expectedPatternForConcatStringAndMDC(level: Level): String = { + compactAndToRegexPattern( + s""" + { + "ts": "", + "level": "$level", + "msg": "Hello This is a log message, Lost executor 1.", + "context": { + "executor_id": "1" + }, + "logger": "$className" + }""") + } + + override def expectedPatternForConcatMDCAndString(level: Level): String = { + compactAndToRegexPattern( + s""" + { + "ts": "", + "level": "$level", + "msg": "Lost executor 1. Hello This is a log message.", + "context": { + "executor_id": "1" + }, + "logger": "$className" + }""") + } + override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = { val pattern1 = compactAndToRegexPattern( s""" From 1ead521a7dba1534496350dedaa07d8bf3a66416 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 2 Apr 2024 19:37:32 +0800 Subject: [PATCH 4/5] fix --- .../scala/org/apache/spark/util/StructuredLoggingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index adb197ff5d125..3ac8db02d3e31 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -71,7 +71,7 @@ trait LoggingSuiteBase (log"Lost executor ${MDC(EXECUTOR_ID, "1")}." ++ s" Hello $basicMsg.") def concatStringAndMDC: LogEntry = - (s"Hello $basicMsg, " ++ (log"Lost executor ${MDC(EXECUTOR_ID, "1")}.")) + (s"Hello $basicMsg, " ++ log"Lost executor ${MDC(EXECUTOR_ID, "1")}.") def expectedPatternForMsgWithMDC(level: Level): String From 3cf47faff118f61ebe54a380f483d5a5102387fd Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 2 Apr 2024 19:41:59 +0800 Subject: [PATCH 5/5] fix --- .../scala/org/apache/spark/util/StructuredLoggingSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index 3ac8db02d3e31..aafd208244594 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -68,10 +68,10 @@ trait LoggingSuiteBase log"Hello $basicMsg, Lost executor ${MDC(EXECUTOR_ID, "1")}." def concatMDCAndString: LogEntry = - (log"Lost executor ${MDC(EXECUTOR_ID, "1")}." ++ s" Hello $basicMsg.") + log"Lost executor ${MDC(EXECUTOR_ID, "1")}." ++ s" Hello $basicMsg." def concatStringAndMDC: LogEntry = - (s"Hello $basicMsg, " ++ log"Lost executor ${MDC(EXECUTOR_ID, "1")}.") + s"Hello $basicMsg, " ++ log"Lost executor ${MDC(EXECUTOR_ID, "1")}." def expectedPatternForMsgWithMDC(level: Level): String