diff --git a/core/src/main/scala/org/apache/spark/ErrorInfo.scala b/core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala similarity index 67% rename from core/src/main/scala/org/apache/spark/ErrorInfo.scala rename to core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala index 6c72a27aa4b3d..dd9f093017df3 100644 --- a/core/src/main/scala/org/apache/spark/ErrorInfo.scala +++ b/core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.spark.util.JsonProtocol.toJsonString import org.apache.spark.util.Utils /** @@ -57,6 +58,10 @@ private[spark] case class ErrorInfo( val messageFormat: String = message.mkString("\n") } +object ErrorMessageFormat extends Enumeration { + val PRETTY, MINIMAL, STANDARD = Value +} + /** * Companion object used by instances of [[SparkThrowable]] to access error class information and * construct error messages. @@ -135,4 +140,61 @@ private[spark] object SparkThrowableHelper { def isInternalError(errorClass: String): Boolean = { errorClass == "INTERNAL_ERROR" } + + def getMessage(e: SparkThrowable with Throwable, format: ErrorMessageFormat.Value): String = { + import ErrorMessageFormat._ + format match { + case PRETTY => e.getMessage + case MINIMAL | STANDARD if e.getErrorClass == null => + toJsonString { generator => + val g = generator.useDefaultPrettyPrinter() + g.writeStartObject() + g.writeStringField("errorClass", "LEGACY") + g.writeObjectFieldStart("messageParameters") + g.writeStringField("message", e.getMessage) + g.writeEndObject() + g.writeEndObject() + } + case MINIMAL | STANDARD => + val errorClass = e.getErrorClass + assert(e.getParameterNames.size == e.getMessageParameters.size, + "Number of message parameter names and values must be the same") + toJsonString { generator => + val g = generator.useDefaultPrettyPrinter() + g.writeStartObject() + g.writeStringField("errorClass", errorClass) + val errorSubClass = e.getErrorSubClass + if (errorSubClass != null) g.writeStringField("errorSubClass", errorSubClass) + if (format == STANDARD) { + val errorInfo = errorClassToInfoMap.getOrElse(errorClass, + throw SparkException.internalError(s"Cannot find the error class '$errorClass'")) + g.writeStringField("message", errorInfo.messageFormat) + } + val sqlState = e.getSqlState + if (sqlState != null) g.writeStringField("sqlState", sqlState) + g.writeObjectFieldStart("messageParameters") + (e.getParameterNames zip e.getMessageParameters).foreach { case (name, value) => + g.writeStringField(name, value) + } + g.writeEndObject() + val queryContext = e.getQueryContext + if (!queryContext.isEmpty) { + g.writeArrayFieldStart("queryContext") + e.getQueryContext.foreach { c => + g.writeStartObject() + g.writeStringField("objectType", c.objectType()) + g.writeStringField("objectName", c.objectName()) + val startIndex = c.startIndex() + 1 + if (startIndex > 0) g.writeNumberField("startIndex", startIndex) + val stopIndex = c.stopIndex() + 1 + if (stopIndex > 0) g.writeNumberField("stopIndex", stopIndex) + g.writeStringField("fragment", c.fragment()) + g.writeEndObject() + } + g.writeEndArray() + } + g.writeEndObject() + } + } + } } diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index 76d7e3048d79a..9c6175192178b 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -222,4 +222,84 @@ class SparkThrowableSuite extends SparkFunSuite { assert(false) } } + + test("Get message in the specified format") { + import ErrorMessageFormat._ + class TestQueryContext extends QueryContext { + override val objectName = "v1" + override val objectType = "VIEW" + override val startIndex = 2 + override val stopIndex = -1 + override val fragment = "1 / 0" + } + val e = new SparkArithmeticException( + errorClass = "DIVIDE_BY_ZERO", + errorSubClass = None, + messageParameters = Array("CONFIG"), + context = Array(new TestQueryContext), + summary = "Query summary") + + assert(SparkThrowableHelper.getMessage(e, PRETTY) === + "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 " + + "and return NULL instead. If necessary set CONFIG to \"false\" to bypass this error." + + "\nQuery summary") + // scalastyle:off line.size.limit + assert(SparkThrowableHelper.getMessage(e, MINIMAL) === + """{ + | "errorClass" : "DIVIDE_BY_ZERO", + | "sqlState" : "22012", + | "messageParameters" : { + | "config" : "CONFIG" + | }, + | "queryContext" : [ { + | "objectType" : "VIEW", + | "objectName" : "v1", + | "startIndex" : 3, + | "fragment" : "1 / 0" + | } ] + |}""".stripMargin) + assert(SparkThrowableHelper.getMessage(e, STANDARD) === + """{ + | "errorClass" : "DIVIDE_BY_ZERO", + | "message" : "Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set to \"false\" to bypass this error.", + | "sqlState" : "22012", + | "messageParameters" : { + | "config" : "CONFIG" + | }, + | "queryContext" : [ { + | "objectType" : "VIEW", + | "objectName" : "v1", + | "startIndex" : 3, + | "fragment" : "1 / 0" + | } ] + |}""".stripMargin) + // scalastyle:on line.size.limit + // STANDARD w/ errorSubClass but w/o queryContext + val e2 = new SparkIllegalArgumentException( + errorClass = "UNSUPPORTED_SAVE_MODE", + errorSubClass = Some("EXISTENT_PATH"), + messageParameters = Array("UNSUPPORTED_MODE")) + assert(SparkThrowableHelper.getMessage(e2, STANDARD) === + """{ + | "errorClass" : "UNSUPPORTED_SAVE_MODE", + | "errorSubClass" : "EXISTENT_PATH", + | "message" : "The save mode is not supported for:", + | "messageParameters" : { + | "saveMode" : "UNSUPPORTED_MODE" + | } + |}""".stripMargin) + // Legacy mode when an exception does not have any error class + class LegacyException extends Throwable with SparkThrowable { + override def getErrorClass: String = null + override def getMessage: String = "Test message" + } + val e3 = new LegacyException + assert(SparkThrowableHelper.getMessage(e3, MINIMAL) === + """{ + | "errorClass" : "LEGACY", + | "messageParameters" : { + | "message" : "Test message" + | } + |}""".stripMargin) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3ce6ee4795827..0839a2f487511 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -31,7 +31,7 @@ import scala.util.matching.Regex import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkConf, SparkContext, TaskContext} +import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES} @@ -3875,6 +3875,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat") + .doc("When PRETTY, the error message consists of textual representation of error class, " + + "message and query context. The MINIMAL and STANDARD formats are pretty JSON formats where " + + "STANDARD includes an additional JSON field `message`. This configuration property " + + "influences on error messages of Thrift Server while running queries.") + .version("3.4.0") + .stringConf.transform(_.toUpperCase(Locale.ROOT)) + .checkValues(ErrorMessageFormat.values.map(_.toString)) + .createWithDefault(ErrorMessageFormat.PRETTY.toString) + /** * Holds information about keys that have been deprecated. * @@ -4658,6 +4668,9 @@ class SQLConf extends Serializable with Logging { def histogramNumericPropagateInputType: Boolean = getConf(SQLConf.HISTOGRAM_NUMERIC_PROPAGATE_INPUT_TYPE) + def errorMessageFormat: ErrorMessageFormat.Value = + ErrorMessageFormat.withName(getConf(SQLConf.ERROR_MESSAGE_FORMAT)) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala index 4d786fd716b33..eaec382e78209 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServerErrors.scala @@ -23,7 +23,7 @@ import java.util.concurrent.RejectedExecutionException import org.apache.hive.service.ServiceException import org.apache.hive.service.cli.{HiveSQLException, OperationType} -import org.apache.spark.SparkThrowable +import org.apache.spark.{ErrorMessageFormat, SparkThrowable, SparkThrowableHelper} /** * Object for grouping error messages from (most) exceptions thrown during @@ -36,11 +36,10 @@ object HiveThriftServerErrors { " new task for execution, please retry the operation", rejected) } - def runningQueryError(e: Throwable): Throwable = e match { - case st: SparkThrowable => - val errorClassPrefix = Option(st.getErrorClass).map(e => s"[$e] ").getOrElse("") - new HiveSQLException( - s"Error running query: ${errorClassPrefix}${st.toString}", st.getSqlState, st) + def runningQueryError(e: Throwable, format: ErrorMessageFormat.Value): Throwable = e match { + case st: SparkThrowable with Throwable => + val message = SparkThrowableHelper.getMessage(st, format) + new HiveSQLException(s"Error running query: $message", st.getSqlState, st) case _ => new HiveSQLException(s"Error running query: ${e.toString}", e) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 090d741d9eed2..3292cbef41772 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -257,7 +257,8 @@ private[hive] class SparkExecuteStatementOperation( statementId, e.getMessage, SparkUtils.exceptionString(e)) e match { case _: HiveSQLException => throw e - case _ => throw HiveThriftServerErrors.runningQueryError(e) + case _ => throw HiveThriftServerErrors.runningQueryError( + e, sqlContext.conf.errorMessageFormat) } } } finally { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 639a5e3a59892..3a38efd27cb8f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.hive.service.cli.{HiveSQLException, OperationHandle} -import org.apache.spark.TaskKilled +import org.apache.spark.{ErrorMessageFormat, TaskKilled} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.internal.SQLConf @@ -149,6 +149,64 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { } } } + + test("formats of error messages") { + val sql = "select 1 / 0" + withCLIServiceClient() { client => + val sessionHandle = client.openSession(user, "") + val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String] + val exec: String => OperationHandle = client.executeStatement(sessionHandle, _, confOverlay) + + exec(s"set ${SQLConf.ANSI_ENABLED.key}=true") + exec(s"set ${SQLConf.ERROR_MESSAGE_FORMAT.key}=${ErrorMessageFormat.PRETTY}") + val e1 = intercept[HiveSQLException](exec(sql)) + // scalastyle:off line.size.limit + assert(e1.getMessage === + """Error running query: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. + |== SQL(line 1, position 8) == + |select 1 / 0 + | ^^^^^ + |""".stripMargin) + + exec(s"set ${SQLConf.ERROR_MESSAGE_FORMAT.key}=${ErrorMessageFormat.MINIMAL}") + val e2 = intercept[HiveSQLException](exec(sql)) + assert(e2.getMessage === + """Error running query: { + | "errorClass" : "DIVIDE_BY_ZERO", + | "sqlState" : "22012", + | "messageParameters" : { + | "config" : "\"spark.sql.ansi.enabled\"" + | }, + | "queryContext" : [ { + | "objectType" : "", + | "objectName" : "", + | "startIndex" : 8, + | "stopIndex" : 12, + | "fragment" : "1 / 0" + | } ] + |}""".stripMargin) + + exec(s"set ${SQLConf.ERROR_MESSAGE_FORMAT.key}=${ErrorMessageFormat.STANDARD}") + val e3 = intercept[HiveSQLException](exec(sql)) + assert(e3.getMessage === + """Error running query: { + | "errorClass" : "DIVIDE_BY_ZERO", + | "message" : "Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set to \"false\" to bypass this error.", + | "sqlState" : "22012", + | "messageParameters" : { + | "config" : "\"spark.sql.ansi.enabled\"" + | }, + | "queryContext" : [ { + | "objectType" : "", + | "objectName" : "", + | "startIndex" : 8, + | "stopIndex" : 12, + | "fragment" : "1 / 0" + | } ] + |}""".stripMargin) + // scalastyle:on line.size.limit + } + } } class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite {