From e8834ea87d4e8b0380746dd975f4d60f094985ff Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Tue, 12 Jul 2016 23:53:30 +0800 Subject: [PATCH 1/6] add SQL query texts in the web UI --- .../spark/sql/catalyst/parser/ParseDriver.scala | 17 +++++++++++------ .../catalyst/plans/logical/LogicalPlan.scala | 3 +++ .../scala/org/apache/spark/sql/Dataset.scala | 5 ++++- .../spark/sql/execution/SQLExecution.scala | 3 ++- .../sql/execution/ui/AllExecutionsPage.scala | 4 ++++ .../spark/sql/execution/ui/SQLListener.scala | 5 ++++- .../sql/execution/ui/SQLListenerSuite.scala | 6 +++++- 7 files changed, 33 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 09598ffe770c6..09995bd7eee58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -65,13 +65,18 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { } /** Creates LogicalPlan for a given SQL string. */ - override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => - astBuilder.visitSingleStatement(parser.singleStatement()) match { - case plan: LogicalPlan => plan - case _ => - val position = Origin(None, None) - throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) + override def parsePlan(sqlText: String): LogicalPlan = { + val logicalPlan = parse(sqlText) { parser => + astBuilder.visitSingleStatement(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _ => + val position = Origin(None, None) + throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) + } } + // Record the original sql text in the top logical plan for checking in the web UI. + logicalPlan.sqlText = Some(sqlText) + logicalPlan } /** Get the builder (visitor) which converts a ParseTree into an AST. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 23520eb82b043..438056a298ff3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -258,6 +258,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { * Refreshes (or invalidates) any metadata/data cached in the plan recursively. */ def refresh(): Unit = children.foreach(_.refresh()) + + // Record the original sql text in the top logical plan for checking in the web UI. + var sqlText: Option[String] = None } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0e7415890e216..0fc663f802052 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2945,7 +2945,10 @@ class Dataset[T] private[sql]( /** A convenient function to wrap a logical plan and produce a Dataset. */ @inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = { - Dataset(sparkSession, logicalPlan) + val dataset: Dataset[U] = Dataset(sparkSession, logicalPlan) + // Copy the original sql text for checking in the web UI. + dataset.logicalPlan.sqlText = queryExecution.logical.sqlText + dataset } /** A convenient function to wrap a set based logical plan and produce a Dataset. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index be35916e3447e..86bc79f951757 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -60,7 +60,8 @@ object SQLExecution { sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) + SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), queryExecution.logical.sqlText, + System.currentTimeMillis())) try { body } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index e96fb9f7550a3..52539cb6541f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -78,6 +78,7 @@ private[ui] abstract class ExecutionTable( protected def baseHeader: Seq[String] = Seq( "ID", "Description", + "SQL Text", "Submitted", "Duration") @@ -103,6 +104,9 @@ private[ui] abstract class ExecutionTable( {descriptionCell(executionUIData)} + + {executionUIData.sqlText.getOrElse("-")} + {UIUtils.formatDate(submissionTime)} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b4a91230a0012..a23f698bf0bfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -40,6 +40,7 @@ case class SparkListenerSQLExecutionStart( details: String, physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, + sqlText: Option[String], time: Long) extends SparkListenerEvent @@ -268,7 +269,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time) => + physicalPlanDescription, sparkPlanInfo, sqlText, time) => val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node => node.metrics.map(metric => metric.accumulatorId -> metric) @@ -280,6 +281,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, + sqlText, time) synchronized { activeExecutions(executionId) = executionUIData @@ -428,6 +430,7 @@ private[ui] class SQLExecutionUIData( val physicalPlanDescription: String, val physicalPlanGraph: SparkPlanGraph, val accumulatorMetrics: Map[Long, SQLPlanMetric], + val sqlText: Option[String], val submissionTime: Long) { var completionTime: Option[Long] = None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index e6cd41e4facf1..4a9dee4355ff6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -123,6 +123,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) val executionUIData = listener.executionIdToData(0) @@ -259,6 +260,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -289,6 +291,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -330,6 +333,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -369,7 +373,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest // These are largely just boilerplate unrelated to what we're trying to test. val df = createTestDataFrame val executionStart = SparkListenerSQLExecutionStart( - 0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0) + 0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), None, 0) val stageInfo = createStageInfo(0, 0) val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0)) val stageSubmitted = SparkListenerStageSubmitted(stageInfo) From fa5d549ab01aeaacc16e48d7e595bfcd30db0339 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Wed, 13 Jul 2016 11:45:09 +0800 Subject: [PATCH 2/6] optimize for long query strings & not show the column when no query string --- .../sql/execution/ui/AllExecutionsPage.scala | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 52539cb6541f9..bc1316f8cdaa4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -60,6 +60,10 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L function clickDetail(details) {{ details.parentNode.querySelector('.stage-details').classList.toggle('collapsed') }} + function clickMore(details) {{ + details.parentNode.querySelector('.sql-abstract').classList.toggle('collapsed') + details.parentNode.querySelector('.sql-full').classList.toggle('collapsed') + }} UIUtils.headerSparkPage("SQL", content, parent, Some(5000)) } @@ -78,16 +82,18 @@ private[ui] abstract class ExecutionTable( protected def baseHeader: Seq[String] = Seq( "ID", "Description", - "SQL Text", "Submitted", "Duration") protected def header: Seq[String] - protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = { + protected def row(currentTime: Long, executionUIData: SQLExecutionUIData, showSqlText: Boolean) + : Seq[Node] = { val submissionTime = executionUIData.submissionTime val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime + val sqlText = executionUIData.sqlText.getOrElse("") + val runningJobs = executionUIData.runningJobs.map { jobId => {jobId.toString}
} @@ -104,9 +110,6 @@ private[ui] abstract class ExecutionTable( {descriptionCell(executionUIData)} - - {executionUIData.sqlText.getOrElse("-")} - {UIUtils.formatDate(submissionTime)} @@ -128,6 +131,11 @@ private[ui] abstract class ExecutionTable( {failedJobs} }} + {if (showSqlText) { + + {sqlTextCell(sqlText)} + + }} } @@ -150,11 +158,35 @@ private[ui] abstract class ExecutionTable(
{desc} {details}
} + private def sqlTextCell(sqlText: String): Seq[Node] = { + // Only show a limited number of characters of sqlText by default when it is too long + val maxLength = 140 + + if (sqlText.length <= maxLength) { +
{sqlText}
+ } else { + val sqlAbstractText = sqlText.substring(0, maxLength) + " ..." +
+
+ {sqlAbstractText} +
+ + + +more + +
+ } + } + def toNodeSeq: Seq[Node] = { + val showSqlText = executionUIDatas.exists(_.sqlText.isDefined) + val headerFull = header ++ {if (showSqlText) Seq("SQL Text") else Seq.empty}

{tableName}

{UIUtils.listingTable[SQLExecutionUIData]( - header, row(currentTime, _), executionUIDatas, id = Some(tableId))} + headerFull, row(currentTime, _, showSqlText), executionUIDatas, id = Some(tableId))}
} From 69180bd5b5b21725ff1e498e98690bc261f079f7 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Fri, 15 Jul 2016 11:46:43 +0800 Subject: [PATCH 3/6] truncate long text & add tool tip --- .../scala/org/apache/spark/ui/ToolTips.scala | 5 ++++ .../scala/org/apache/spark/ui/UIUtils.scala | 23 +++++++++++++++++-- .../sql/catalyst/parser/ParseDriver.scala | 13 ++++++++++- .../sql/execution/ui/AllExecutionsPage.scala | 12 ++++++++-- 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 766cc65084f07..51f6e67d1a2cc 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -99,4 +99,9 @@ private[spark] object ToolTips { dynamic allocation is enabled. The number of granted executors may exceed the limit ephemerally when executors are being killed. """ + + val SQL_TEXT = + """Shows 140 characters by default. Click "+more" to see more. Long texts are truncated to 1000 + |characters. Left blank when the query was not issued by SQL.""" + .stripMargin.replaceAll("\n", " ") } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 2610f673d27f6..2bdb241152edd 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -296,7 +296,9 @@ private[spark] object UIUtils extends Logging { id: Option[String] = None, headerClasses: Seq[String] = Seq.empty, stripeRowsWithCss: Boolean = true, - sortable: Boolean = true): Seq[Node] = { + sortable: Boolean = true, + // If the tool tip is defined, Some(toolTipText, toolTipPosition), otherwise None. + headerToolTips: Seq[Option[(String, String)]] = Seq.empty): Seq[Node] = { val listingTableClass = { val _tableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED @@ -317,6 +319,14 @@ private[spark] object UIUtils extends Logging { } } + def getToolTip(index: Int): Option[(String, String)] = { + if (index < headerToolTips.size) { + headerToolTips(index) + } else { + None + } + } + val newlinesInHeader = headers.exists(_.contains("\n")) def getHeaderContent(header: String): Seq[Node] = { if (newlinesInHeader) { @@ -330,7 +340,16 @@ private[spark] object UIUtils extends Logging { val headerRow: Seq[Node] = { headers.view.zipWithIndex.map { x => - {getHeaderContent(x._1)} + val toolTipOption = getToolTip(x._2) + if (toolTipOption.isEmpty) { + {getHeaderContent(x._1)} + } else { + val toolTip = toolTipOption.get + // scalastyle:off line.size.limit + {getHeaderContent(x._1)} + // scalastyle:on line.size.limit + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 09995bd7eee58..d8a129cab48d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -75,7 +75,18 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { } } // Record the original sql text in the top logical plan for checking in the web UI. - logicalPlan.sqlText = Some(sqlText) + // Truncate the text to avoid downing browsers or web UI servers by running out of memory. + val maxLength = 1000 + val suffix = " ... (truncated)" + val truncateLength = maxLength - suffix.length + val truncatedSqlText = { + if (sqlText.length <= maxLength) { + sqlText + } else { + sqlText.substring(0, truncateLength) + suffix + } + } + logicalPlan.sqlText = Some(truncatedSqlText) logicalPlan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index bc1316f8cdaa4..1801c4895bbb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -25,7 +25,7 @@ import scala.xml.Node import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.internal.Logging -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { @@ -183,10 +183,18 @@ private[ui] abstract class ExecutionTable( def toNodeSeq: Seq[Node] = { val showSqlText = executionUIDatas.exists(_.sqlText.isDefined) val headerFull = header ++ {if (showSqlText) Seq("SQL Text") else Seq.empty} + val sqlTextToolTip = {if (showSqlText) { + Seq(Some(ToolTips.SQL_TEXT, "top")) + } else { + Seq.empty + }} + val headerToolTips: Seq[Option[(String, String)]] = header.map(_ => None) ++ sqlTextToolTip +

{tableName}

{UIUtils.listingTable[SQLExecutionUIData]( - headerFull, row(currentTime, _, showSqlText), executionUIDatas, id = Some(tableId))} + headerFull, row(currentTime, _, showSqlText), executionUIDatas, id = Some(tableId), + headerToolTips = headerToolTips)}
} From 114401a630650623c7c311bf753d4422d98e1550 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Mon, 29 May 2017 21:08:34 -0700 Subject: [PATCH 4/6] Remove unnecessary scala style --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 2bdb241152edd..42aad8968d671 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -345,9 +345,7 @@ private[spark] object UIUtils extends Logging { } else { val toolTip = toolTipOption.get - // scalastyle:off line.size.limit - // scalastyle:on line.size.limit } } From 64876571aa7dd9dd39f241cf7ab5ef706db4396a Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Mon, 29 May 2017 21:37:23 -0700 Subject: [PATCH 5/6] Add back Scala style --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 42aad8968d671..2bdb241152edd 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -345,7 +345,9 @@ private[spark] object UIUtils extends Logging { } else { val toolTip = toolTipOption.get + // scalastyle:off line.size.limit + // scalastyle:on line.size.limit } } From ac96aaa995330a3b6849cbbfa68222a2e90d0e15 Mon Sep 17 00:00:00 2001 From: Tao Lin Date: Mon, 29 May 2017 23:50:04 -0700 Subject: [PATCH 6/6] Store sql text in QueryExecution instead of logical plan --- .../sql/catalyst/parser/ParseDriver.scala | 28 ++++--------------- .../catalyst/plans/logical/LogicalPlan.scala | 3 -- .../scala/org/apache/spark/sql/Dataset.scala | 27 ++++++++++++++---- .../org/apache/spark/sql/SparkSession.scala | 2 +- .../spark/sql/execution/QueryExecution.scala | 6 +++- .../spark/sql/execution/SQLExecution.scala | 2 +- .../internal/BaseSessionStateBuilder.scala | 5 ++++ .../spark/sql/internal/SessionState.scala | 5 ++-- 8 files changed, 42 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index d8a129cab48d5..09598ffe770c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -65,29 +65,13 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { } /** Creates LogicalPlan for a given SQL string. */ - override def parsePlan(sqlText: String): LogicalPlan = { - val logicalPlan = parse(sqlText) { parser => - astBuilder.visitSingleStatement(parser.singleStatement()) match { - case plan: LogicalPlan => plan - case _ => - val position = Origin(None, None) - throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) - } - } - // Record the original sql text in the top logical plan for checking in the web UI. - // Truncate the text to avoid downing browsers or web UI servers by running out of memory. - val maxLength = 1000 - val suffix = " ... (truncated)" - val truncateLength = maxLength - suffix.length - val truncatedSqlText = { - if (sqlText.length <= maxLength) { - sqlText - } else { - sqlText.substring(0, truncateLength) + suffix - } + override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => + astBuilder.visitSingleStatement(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _ => + val position = Origin(None, None) + throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) } - logicalPlan.sqlText = Some(truncatedSqlText) - logicalPlan } /** Get the builder (visitor) which converts a ParseTree into an AST. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 438056a298ff3..23520eb82b043 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -258,9 +258,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { * Refreshes (or invalidates) any metadata/data cached in the plan recursively. */ def refresh(): Unit = children.foreach(_.refresh()) - - // Record the original sql text in the top logical plan for checking in the web UI. - var sqlText: Option[String] = None } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0fc663f802052..b282bfc449032 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,8 +61,26 @@ private[sql] object Dataset { new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]]) } - def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { - val qe = sparkSession.sessionState.executePlan(logicalPlan) + def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, sqlText: Option[String] = None) + : DataFrame = { + + // Record the original sql text in the execute plan for checking in the web UI. + // Truncate the text to avoid downing browsers or web UI servers by running out of memory. + val text = sqlText.get; + val maxLength = 1000 + val suffix = " ... (truncated)" + val truncateLength = maxLength - suffix.length + val truncatedSqlText: Option[String] = sqlText match { + case None => None + case Some(text) => Some( + if (text.length <= maxLength) { + text + } else { + text.substring(0, truncateLength) + suffix + }) + } + + val qe = sparkSession.sessionState.executePlan(logicalPlan, truncatedSqlText) qe.assertAnalyzed() new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) } @@ -2945,10 +2963,7 @@ class Dataset[T] private[sql]( /** A convenient function to wrap a logical plan and produce a Dataset. */ @inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = { - val dataset: Dataset[U] = Dataset(sparkSession, logicalPlan) - // Copy the original sql text for checking in the web UI. - dataset.logicalPlan.sqlText = queryExecution.logical.sqlText - dataset + Dataset(sparkSession, logicalPlan) } /** A convenient function to wrap a set based logical plan and produce a Dataset. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d2bf350711936..ece2ec6173f16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -620,7 +620,7 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) + Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText), Some(sqlText)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 2e05e5d65923c..fe9af812bf44d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -39,7 +39,11 @@ import org.apache.spark.util.Utils * While this is not a public class, we should avoid changing the function names for the sake of * changing them, because a lot of developers use the feature for debugging. */ -class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { +class QueryExecution( + val sparkSession: SparkSession, + val logical: LogicalPlan, + // Record the original sql text in the top logical plan for checking in the web UI. + val sqlText: Option[String] = None) { // TODO: Move the planner an optimizer into here from SessionState. protected def planner = sparkSession.sessionState.planner diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 86bc79f951757..8492eb5179fe1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -60,7 +60,7 @@ object SQLExecution { sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), queryExecution.logical.sqlText, + SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), queryExecution.sqlText, System.currentTimeMillis())) try { body diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 2a801d87b12eb..e9b07e9dcd3ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -253,6 +253,11 @@ abstract class BaseSessionStateBuilder( new QueryExecution(session, plan) } + protected def createQueryExecution(plan: LogicalPlan, sqlText: Option[String]) + : QueryExecution = { + new QueryExecution(session, plan, sqlText) + } + /** * Interface to start and stop streaming queries. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 1b341a12fc609..1af2aff5ebe63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -67,7 +67,7 @@ private[sql] class SessionState( val streamingQueryManager: StreamingQueryManager, val listenerManager: ExecutionListenerManager, val resourceLoader: SessionResourceLoader, - createQueryExecution: LogicalPlan => QueryExecution, + createQueryExecution: (LogicalPlan, Option[String]) => QueryExecution, createClone: (SparkSession, SessionState) => SessionState) { def newHadoopConf(): Configuration = SessionState.newHadoopConf( @@ -93,7 +93,8 @@ private[sql] class SessionState( // Helper methods, partially leftover from pre-2.0 days // ------------------------------------------------------ - def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan) + def executePlan(plan: LogicalPlan, sqlText: Option[String] = None): QueryExecution + = createQueryExecution(plan, sqlText) def refreshTable(tableName: String): Unit = { catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
{getHeaderContent(x._1)}{getHeaderContent(x._1)}{getHeaderContent(x._1)}{getHeaderContent(x._1)}