From 05127bad3508ab88e347d657088edeb675a73622 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 3 Apr 2024 15:19:15 +0800 Subject: [PATCH 1/6] [SPARK-47598][CORE] MLLib: Migrate logError with variables to structured logging framework --- .../org/apache/spark/internal/LogKey.scala | 10 ++++++ .../org/apache/spark/internal/Logging.scala | 4 ++- .../org/apache/spark/util/MDCSuite.scala | 2 ++ .../spark/ml/classification/LinearSVC.scala | 14 ++++---- .../classification/LogisticRegression.scala | 14 ++++---- .../ml/regression/AFTSurvivalRegression.scala | 7 ++-- .../ml/regression/LinearRegression.scala | 7 ++-- .../spark/ml/util/Instrumentation.scala | 35 ++++++++++++++++++- .../spark/mllib/util/DataValidators.scala | 11 +++--- .../spark/ml/feature/VectorIndexerSuite.scala | 9 +++-- .../tree/GradientBoostedTreesSuite.scala | 17 +++++---- 11 files changed, 97 insertions(+), 33 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index b8a43a03d8b62..cf87cee619a09 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -32,6 +32,16 @@ object LogKey extends Enumeration { val MIN_SIZE = Value val REMOTE_ADDRESS = Value val POD_ID = Value + val NUM_ITERATIONS = Value + val LEARNING_RATE = Value + val SUBSAMPLING_RATE = Value + + val MAX_CATEGORIES = Value + val CATEGORICAL_FEATURES = Value + + val RANGE_CLASSIFICATION_LABELS = Value + val NUM_CLASSIFICATION_LABELS = Value + val OPTIMIZER_CLASS_NAME = Value type LogKey = Value } diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 84b9debb2afda..e3b737533203a 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -49,6 +49,8 @@ case class MessageWithContext(message: String, context: java.util.HashMap[String resultMap.putAll(mdc.context) MessageWithContext(message + mdc.message, resultMap) } + + override def toString: String = message } /** @@ -117,7 +119,7 @@ trait Logging { } } - private def withLogContext(context: java.util.HashMap[String, String])(body: => Unit): Unit = { + protected def withLogContext(context: java.util.HashMap[String, String])(body: => Unit): Unit = { val threadContext = CloseableThreadContext.putAll(context) try { body diff --git a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala index 1ac51e236080c..6cae7bae7e331 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala @@ -32,6 +32,7 @@ class MDCSuite val log = log"This is a log, exitcode ${MDC(EXIT_CODE, 10086)}" assert(log.message === "This is a log, exitcode 10086") assert(log.context === Map("exit_code" -> "10086").asJava) + assert(log.toString === "This is a log, exitcode 10086") } test("custom object as MDC value") { @@ -39,6 +40,7 @@ class MDCSuite val log = log"This is a log, exitcode ${MDC(EXIT_CODE, cov)}" assert(log.message === "This is a log, exitcode CustomObjectValue: spark, 10086") assert(log.context === Map("exit_code" -> "CustomObjectValue: spark, 10086").asJava) + assert(log.toString === "This is a log, exitcode CustomObjectValue: spark, 10086") } case class CustomObjectValue(key: String, value: Int) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 13898a304b3da..8c9927de116cb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{NUM_CLASSIFICATION_LABELS, OPTIMIZER_CLASS_NAME, RANGE_CLASSIFICATION_LABELS} import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator._ @@ -220,10 +221,11 @@ class LinearSVC @Since("2.2.0") ( instr.logNumFeatures(numFeatures) if (numInvalid != 0) { - val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + - s"Found $numInvalid invalid labels." + val msg = log"Classification labels should be in " + + log"${MDC(RANGE_CLASSIFICATION_LABELS, s"[0 to ${numClasses - 1}]")}. " + + log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } val featuresStd = summarizer.std.toArray @@ -249,9 +251,9 @@ class LinearSVC @Since("2.2.0") ( regularization, optimizer) if (rawCoefficients == null) { - val msg = s"${optimizer.getClass.getName} failed." + val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } val coefficientArray = Array.tabulate(numFeatures) { i => diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 8b796a65f4f8c..ad583b21ef73b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{NUM_CLASSIFICATION_LABELS, OPTIMIZER_CLASS_NAME, RANGE_CLASSIFICATION_LABELS} import org.apache.spark.ml.feature._ import org.apache.spark.ml.impl.Utils import org.apache.spark.ml.linalg._ @@ -530,10 +531,11 @@ class LogisticRegression @Since("1.2.0") ( } if (numInvalid != 0) { - val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + - s"Found $numInvalid invalid labels." + val msg = log"Classification labels should be in " + + log"${MDC(RANGE_CLASSIFICATION_LABELS, s"[0 to ${numClasses - 1}]")}. " + + log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } instr.logNumClasses(numClasses) @@ -634,9 +636,9 @@ class LogisticRegression @Since("1.2.0") ( initialSolution.toArray, regularization, optimizer) if (allCoefficients == null) { - val msg = s"${optimizer.getClass.getName} failed." + val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index e9abcb0954770..b5c52daf406d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.OPTIMIZER_CLASS_NAME import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ @@ -271,9 +272,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S optimizer, initialSolution) if (rawCoefficients == null) { - val msg = s"${optimizer.getClass.getName} failed." + val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } val coefficientArray = Array.tabulate(numFeatures) { i => diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 9638eee8d5901..e24d5b01dee2c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.OPTIMIZER_CLASS_NAME import org.apache.spark.ml.{PipelineStage, PredictorParams} import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} @@ -428,9 +429,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String featuresMean, featuresStd, initialSolution, regularization, optimizer) if (parameters == null) { - val msg = s"${optimizer.getClass.getName} failed." + val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." instr.logError(msg) - throw new SparkException(msg) + throw new SparkException(msg.message) } val model = createModel(parameters, yMean, yStd, featuresMean, featuresStd) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index a243ab8d27c9c..f250ea0f92d81 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -27,7 +27,7 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogEntry, Logging, MessageWithContext} import org.apache.spark.ml.{MLEvents, PipelineStage} import org.apache.spark.ml.param.{Param, Params} import org.apache.spark.rdd.RDD @@ -84,6 +84,17 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { super.logWarning(prefix + msg) } + /** + * Logs a LogEntry which message with a prefix that uniquely identifies the training session. + */ + override def logWarning(entry: LogEntry): Unit = { + if (log.isWarnEnabled) { + withLogContext(entry.context) { + log.warn(prefix + entry.message) + } + } + } + /** * Logs a error message with a prefix that uniquely identifies the training session. */ @@ -91,6 +102,17 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { super.logError(prefix + msg) } + /** + * Logs a LogEntry which message with a prefix that uniquely identifies the training session. + */ + override def logError(entry: LogEntry): Unit = { + if (log.isErrorEnabled) { + withLogContext(entry.context) { + log.error(prefix + entry.message) + } + } + } + /** * Logs an info message with a prefix that uniquely identifies the training session. */ @@ -98,6 +120,17 @@ private[spark] class Instrumentation private () extends Logging with MLEvents { super.logInfo(prefix + msg) } + /** + * Logs a LogEntry which message with a prefix that uniquely identifies the training session. + */ + override def logInfo(entry: LogEntry): Unit = { + if (log.isInfoEnabled) { + withLogContext(entry.context) { + log.info(prefix + entry.message) + } + } + } + /** * Logs the value of the given parameters for the estimator being used in this session. */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala index 1a8c8807f91d4..9733cf18dc1cb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala @@ -18,7 +18,8 @@ package org.apache.spark.mllib.util import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{NUM_CLASSIFICATION_LABELS, RANGE_CLASSIFICATION_LABELS} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD @@ -37,7 +38,8 @@ object DataValidators extends Logging { val binaryLabelValidator: RDD[LabeledPoint] => Boolean = { data => val numInvalid = data.filter(x => x.label != 1.0 && x.label != 0.0).count() if (numInvalid != 0) { - logError("Classification labels should be 0 or 1. Found " + numInvalid + " invalid labels") + logError(log"Classification labels should be 0 or 1. " + + log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels") } numInvalid == 0 } @@ -53,8 +55,9 @@ object DataValidators extends Logging { val numInvalid = data.filter(x => x.label - x.label.toInt != 0.0 || x.label < 0 || x.label > k - 1).count() if (numInvalid != 0) { - logError("Classification labels should be in {0 to " + (k - 1) + "}. " + - "Found " + numInvalid + " invalid labels") + logError(log"Classification labels should be in " + + log"${MDC(RANGE_CLASSIFICATION_LABELS, s"[0 to ${k - 1}]")}. " + + log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels") } numInvalid == 0 } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index aa7b4e17a4df8..fe695ff157137 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.ml.feature import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CATEGORICAL_FEATURES, MAX_CATEGORIES} import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite @@ -175,8 +176,10 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { maxCategories: Int, categoricalFeatures: Set[Int]): Unit = { val collectedData = data.collect().map(_.getAs[Vector](0)) - val errMsg = s"checkCategoryMaps failed for input with maxCategories=$maxCategories," + - s" categoricalFeatures=${categoricalFeatures.mkString(", ")}" + + val errMsg = log"checkCategoryMaps failed for input with " + + log"maxCategories=${MDC(MAX_CATEGORIES, maxCategories)} " + + log"categoricalFeatures=${MDC(CATEGORICAL_FEATURES, categoricalFeatures.mkString(", "))}" try { val vectorIndexer = getIndexer.setMaxCategories(maxCategories) val model = vectorIndexer.fit(data) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index 5cf51c252cd50..b0a4dd1dcf82b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.mllib.tree import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.LogKey.{LEARNING_RATE, NUM_ITERATIONS, SUBSAMPLING_RATE} +import org.apache.spark.internal.MDC import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.configuration.Algo._ @@ -51,8 +53,9 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.06) } catch { case e: java.lang.AssertionError => - logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + - s" subsamplingRate=$subsamplingRate") + logError(log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + + log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + + log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}") throw e } @@ -82,8 +85,9 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.85, "mae") } catch { case e: java.lang.AssertionError => - logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + - s" subsamplingRate=$subsamplingRate") + logError(log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + + log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + + log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}") throw e } @@ -114,8 +118,9 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.9) } catch { case e: java.lang.AssertionError => - logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + - s" subsamplingRate=$subsamplingRate") + logError(log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + + log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + + log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}") throw e } From cf221811089c9025b870582cdd53e3ad8dc7c77f Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 3 Apr 2024 16:30:48 +0800 Subject: [PATCH 2/6] fix it --- .../main/scala/org/apache/spark/ml/util/Instrumentation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index f250ea0f92d81..bfc6465c58bd1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -27,7 +27,7 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.internal.{LogEntry, Logging, MessageWithContext} +import org.apache.spark.internal.{LogEntry, Logging} import org.apache.spark.ml.{MLEvents, PipelineStage} import org.apache.spark.ml.param.{Param, Params} import org.apache.spark.rdd.RDD From c46a60116bd9cba3ac03a472a6e91d3971a9b001 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 3 Apr 2024 18:47:15 +0800 Subject: [PATCH 3/6] fix it --- .../main/scala/org/apache/spark/internal/Logging.scala | 2 -- .../org/apache/spark/ml/feature/VectorIndexerSuite.scala | 8 ++++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index e3b737533203a..2132e166eacf7 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -49,8 +49,6 @@ case class MessageWithContext(message: String, context: java.util.HashMap[String resultMap.putAll(mdc.context) MessageWithContext(message + mdc.message, resultMap) } - - override def toString: String = message } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index fe695ff157137..0a9347b87977e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -213,8 +213,8 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { assert(attr.values.get === origValueSet.toArray.sorted.map(_.toString)) assert(attr.isOrdinal.get === false) case _ => - throw new RuntimeException(errMsg + s". Categorical feature $feature failed" + - s" metadata check. Found feature attribute: $featureAttr.") + throw new RuntimeException(errMsg.message + s". Categorical feature $feature " + + s"failed metadata check. Found feature attribute: $featureAttr.") } } // Check numerical feature metadata. @@ -225,8 +225,8 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { case attr: NumericAttribute => assert(featureAttr.index.get === feature) case _ => - throw new RuntimeException(errMsg + s". Numerical feature $feature failed" + - s" metadata check. Found feature attribute: $featureAttr.") + throw new RuntimeException(errMsg.message + s". Numerical feature $feature " + + s"failed metadata check. Found feature attribute: $featureAttr.") } } } From 7b4fe42c2828fab4c9423d48bb9972d5494e7ee5 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 3 Apr 2024 19:45:25 +0800 Subject: [PATCH 4/6] fix it --- .../utils/src/test/scala/org/apache/spark/util/MDCSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala index 6cae7bae7e331..1ac51e236080c 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala @@ -32,7 +32,6 @@ class MDCSuite val log = log"This is a log, exitcode ${MDC(EXIT_CODE, 10086)}" assert(log.message === "This is a log, exitcode 10086") assert(log.context === Map("exit_code" -> "10086").asJava) - assert(log.toString === "This is a log, exitcode 10086") } test("custom object as MDC value") { @@ -40,7 +39,6 @@ class MDCSuite val log = log"This is a log, exitcode ${MDC(EXIT_CODE, cov)}" assert(log.message === "This is a log, exitcode CustomObjectValue: spark, 10086") assert(log.context === Map("exit_code" -> "CustomObjectValue: spark, 10086").asJava) - assert(log.toString === "This is a log, exitcode CustomObjectValue: spark, 10086") } case class CustomObjectValue(key: String, value: Int) { From 5b4f2a19512db8914b8b26bb3fb7ae98cd48a918 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 4 Apr 2024 08:56:36 +0800 Subject: [PATCH 5/6] fix --- .../org/apache/spark/internal/LogKey.scala | 18 +++++++-------- .../org/apache/spark/util/LogKeySuite.scala | 3 ++- .../spark/ml/classification/LinearSVC.scala | 11 ++++----- .../classification/LogisticRegression.scala | 10 ++++---- .../ml/regression/AFTSurvivalRegression.scala | 8 ++----- .../ml/regression/LinearRegression.scala | 8 ++----- .../spark/mllib/util/DataValidators.scala | 8 +++---- .../org/apache/spark/mllib/util/MLUtils.scala | 10 +++++++- .../tree/GradientBoostedTreesSuite.scala | 23 +++++++++++-------- 9 files changed, 49 insertions(+), 50 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index cf87cee619a09..e44d4ef714491 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -24,24 +24,22 @@ object LogKey extends Enumeration { val APPLICATION_ID = Value val APPLICATION_STATE = Value val BUCKET = Value + val CATEGORICAL_FEATURES = Value val CONTAINER_ID = Value + val COUNT = Value val EXECUTOR_ID = Value val EXIT_CODE = Value + val LEARNING_RATE = Value + val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value val MIN_SIZE = Value - val REMOTE_ADDRESS = Value - val POD_ID = Value val NUM_ITERATIONS = Value - val LEARNING_RATE = Value - val SUBSAMPLING_RATE = Value - - val MAX_CATEGORIES = Value - val CATEGORICAL_FEATURES = Value - - val RANGE_CLASSIFICATION_LABELS = Value - val NUM_CLASSIFICATION_LABELS = Value val OPTIMIZER_CLASS_NAME = Value + val POD_ID = Value + val RANGE = Value + val REMOTE_ADDRESS = Value + val SUBSAMPLING_RATE = Value type LogKey = Value } diff --git a/common/utils/src/test/scala/org/apache/spark/util/LogKeySuite.scala b/common/utils/src/test/scala/org/apache/spark/util/LogKeySuite.scala index 39229f4b910bf..1f3c2d77d35f4 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/LogKeySuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/LogKeySuite.scala @@ -27,6 +27,7 @@ class LogKeySuite test("LogKey enumeration fields must be sorted alphabetically") { val keys = LogKey.values.toSeq - assert(keys === keys.sorted, "LogKey enumeration fields must be sorted alphabetically") + assert(keys === keys.sortBy(_.toString), + "LogKey enumeration fields must be sorted alphabetically") } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 8c9927de116cb..024693ba06f20 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{NUM_CLASSIFICATION_LABELS, OPTIMIZER_CLASS_NAME, RANGE_CLASSIFICATION_LABELS} +import org.apache.spark.internal.LogKey.{COUNT, RANGE} import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator._ @@ -37,6 +37,7 @@ import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DatasetUtils._ import org.apache.spark.ml.util.Instrumentation.instrumented +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel @@ -222,8 +223,8 @@ class LinearSVC @Since("2.2.0") ( if (numInvalid != 0) { val msg = log"Classification labels should be in " + - log"${MDC(RANGE_CLASSIFICATION_LABELS, s"[0 to ${numClasses - 1}]")}. " + - log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels." + log"${MDC(RANGE, s"[0 to ${numClasses - 1}]")}. " + + log"Found ${MDC(COUNT, numInvalid)} invalid labels." instr.logError(msg) throw new SparkException(msg.message) } @@ -251,9 +252,7 @@ class LinearSVC @Since("2.2.0") ( regularization, optimizer) if (rawCoefficients == null) { - val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." - instr.logError(msg) - throw new SparkException(msg.message) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val coefficientArray = Array.tabulate(numFeatures) { i => diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index ad583b21ef73b..26ea90cb651b7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{NUM_CLASSIFICATION_LABELS, OPTIMIZER_CLASS_NAME, RANGE_CLASSIFICATION_LABELS} +import org.apache.spark.internal.LogKey.{COUNT, OPTIMIZER_CLASS_NAME, RANGE} import org.apache.spark.ml.feature._ import org.apache.spark.ml.impl.Utils import org.apache.spark.ml.linalg._ @@ -532,8 +532,8 @@ class LogisticRegression @Since("1.2.0") ( if (numInvalid != 0) { val msg = log"Classification labels should be in " + - log"${MDC(RANGE_CLASSIFICATION_LABELS, s"[0 to ${numClasses - 1}]")}. " + - log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels." + log"${MDC(RANGE, s"[0 to ${numClasses - 1}]")}. " + + log"Found ${MDC(COUNT, numInvalid)} invalid labels." instr.logError(msg) throw new SparkException(msg.message) } @@ -636,9 +636,7 @@ class LogisticRegression @Since("1.2.0") ( initialSolution.toArray, regularization, optimizer) if (allCoefficients == null) { - val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." - instr.logError(msg) - throw new SparkException(msg.message) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index b5c52daf406d6..57d20bcd6f49d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -23,10 +23,8 @@ import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS} import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.OPTIMIZER_CLASS_NAME +import org.apache.spark.internal.Logging import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ @@ -272,9 +270,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S optimizer, initialSolution) if (rawCoefficients == null) { - val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." - instr.logError(msg) - throw new SparkException(msg.message) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val coefficientArray = Array.tabulate(numFeatures) { i => diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index e24d5b01dee2c..d53b8b270f2d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -25,10 +25,8 @@ import breeze.stats.distributions.Rand.FixedSeed.randBasis import breeze.stats.distributions.StudentsT import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.OPTIMIZER_CLASS_NAME +import org.apache.spark.internal.Logging import org.apache.spark.ml.{PipelineStage, PredictorParams} import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} @@ -429,9 +427,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String featuresMean, featuresStd, initialSolution, regularization, optimizer) if (parameters == null) { - val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizer.getClass.getName)} failed." - instr.logError(msg) - throw new SparkException(msg.message) + MLUtils.optimizerFailed(instr, optimizer.getClass) } val model = createModel(parameters, yMean, yStd, featuresMean, featuresStd) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala index 9733cf18dc1cb..d8c0f8711cabc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.util import org.apache.spark.annotation.Since import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{NUM_CLASSIFICATION_LABELS, RANGE_CLASSIFICATION_LABELS} +import org.apache.spark.internal.LogKey.{COUNT, RANGE} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD @@ -39,7 +39,7 @@ object DataValidators extends Logging { val numInvalid = data.filter(x => x.label != 1.0 && x.label != 0.0).count() if (numInvalid != 0) { logError(log"Classification labels should be 0 or 1. " + - log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels") + log"Found ${MDC(COUNT, numInvalid)} invalid labels") } numInvalid == 0 } @@ -56,8 +56,8 @@ object DataValidators extends Logging { x.label - x.label.toInt != 0.0 || x.label < 0 || x.label > k - 1).count() if (numInvalid != 0) { logError(log"Classification labels should be in " + - log"${MDC(RANGE_CLASSIFICATION_LABELS, s"[0 to ${k - 1}]")}. " + - log"Found ${MDC(NUM_CLASSIFICATION_LABELS, numInvalid)} invalid labels") + log"${MDC(RANGE, s"[0 to ${k - 1}]")}. " + + log"Found ${MDC(COUNT, numInvalid)} invalid labels") } numInvalid == 0 } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 378f1381e4cf8..10adf10690b77 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -22,8 +22,10 @@ import scala.reflect.ClassTag import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.OPTIMIZER_CLASS_NAME import org.apache.spark.ml.linalg.{MatrixUDT => MLMatrixUDT, VectorUDT => MLVectorUDT} +import org.apache.spark.ml.util.Instrumentation import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.BLAS.dot import org.apache.spark.mllib.regression.LabeledPoint @@ -593,4 +595,10 @@ object MLUtils extends Logging { math.log1p(math.exp(x)) } } + + def optimizerFailed(instr: Instrumentation, optimizerClass: Class[_]): Unit = { + val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizerClass.getName)} failed." + instr.logError(msg) + throw new SparkException(msg.message) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index b0a4dd1dcf82b..f5c6abfc66f27 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.mllib.tree import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.{MDC, MessageWithContext} import org.apache.spark.internal.LogKey.{LEARNING_RATE, NUM_ITERATIONS, SUBSAMPLING_RATE} -import org.apache.spark.internal.MDC import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.configuration.Algo._ @@ -35,6 +35,15 @@ import org.apache.spark.util.Utils */ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext { + private def buildErrorLog( + numIterations: Int, + learningRate: Double, + subsamplingRate: Double): MessageWithContext = { + log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + + log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + + log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}" + } + test("Regression with continuous features: SquaredError") { GradientBoostedTreesSuite.testCombinations.foreach { case (numIterations, learningRate, subsamplingRate) => @@ -53,9 +62,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.06) } catch { case e: java.lang.AssertionError => - logError(log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + - log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + - log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}") + logError(buildErrorLog(numIterations, learningRate, subsamplingRate)) throw e } @@ -85,9 +92,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.85, "mae") } catch { case e: java.lang.AssertionError => - logError(log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + - log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + - log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}") + logError(buildErrorLog(numIterations, learningRate, subsamplingRate)) throw e } @@ -118,9 +123,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.9) } catch { case e: java.lang.AssertionError => - logError(log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " + - log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " + - log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}") + logError(buildErrorLog(numIterations, learningRate, subsamplingRate)) throw e } From 7dfa11534222d297163044d70512c7fa48d15dbd Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 4 Apr 2024 10:24:53 +0800 Subject: [PATCH 6/6] remove unused import --- .../org/apache/spark/ml/classification/LogisticRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 26ea90cb651b7..0d487377b9319 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{COUNT, OPTIMIZER_CLASS_NAME, RANGE} +import org.apache.spark.internal.LogKey.{COUNT, RANGE} import org.apache.spark.ml.feature._ import org.apache.spark.ml.impl.Utils import org.apache.spark.ml.linalg._