From e8c4af1b9a0fbc9eb134bd0579c4ce7f9d0cb256 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 10 Dec 2019 15:45:23 +0900 Subject: [PATCH] Fix --- .../org/apache/spark/sql/ExplainMode.java | 64 ++++++++++++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 40 ++++++++--- .../sql/execution/command/commands.scala | 7 +- .../org/apache/spark/sql/ExplainSuite.scala | 67 ++++++++++++++++--- 4 files changed, 156 insertions(+), 22 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/ExplainMode.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/ExplainMode.java b/sql/core/src/main/java/org/apache/spark/sql/ExplainMode.java new file mode 100644 index 0000000000000..018ea12db55f1 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/ExplainMode.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql; + +import org.apache.spark.annotation.Unstable; + +/** + * ExplainMode is used to specify the expected output format of plans (logical and physical) + * for debugging purpose. + * + * @since 3.0.0 + */ +@Unstable +public enum ExplainMode { + /** + * Simple mode means that when printing explain for a DataFrame, only a physical plan is + * expected to be printed to the console. + * + * @since 3.0.0 + */ + Simple, + /** + * Extended mode means that when printing explain for a DataFrame, both logical and physical + * plans are expected to be printed to the console. + * + * @since 3.0.0 + */ + Extended, + /** + * Extended mode means that when printing explain for a DataFrame, if generated codes are + * available, a physical plan and the generated codes are expected to be printed to the console. + * + * @since 3.0.0 + */ + Codegen, + /** + * Extended mode means that when printing explain for a DataFrame, if plan node statistics are + * available, a logical plan and the statistics are expected to be printed to the console. + * + * @since 3.0.0 + */ + Cost, + /** + * Formatted mode means that when printing explain for a DataFrame, explain output is + * expected to be split into two sections: a physical plan outline and node details. + * + * @since 3.0.0 + */ + Formatted +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index be3f2aed25936..aa641f0a3fa4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -522,36 +522,58 @@ class Dataset[T] private[sql]( // scalastyle:on println /** - * Prints the plans (logical and physical) to the console for debugging purposes. + * Prints the plans (logical and physical) with a format specified by a given explain mode. * * @group basic - * @since 1.6.0 + * @since 3.0.0 */ - def explain(extended: Boolean): Unit = { + def explain(mode: ExplainMode): Unit = { // Because temporary views are resolved during analysis when we create a Dataset, and // `ExplainCommand` analyzes input query plan and resolves temporary views again. Using // `ExplainCommand` here will probably output different query plans, compared to the results // of evaluation of the Dataset. So just output QueryExecution's query plans here. val qe = ExplainCommandUtil.explainedQueryExecution(sparkSession, logicalPlan, queryExecution) - val outputString = - if (extended) { - qe.toString - } else { + val outputString = mode match { + case ExplainMode.Simple => qe.simpleString - } + case ExplainMode.Extended => + qe.toString + case ExplainMode.Codegen => + try { + org.apache.spark.sql.execution.debug.codegenString(queryExecution.executedPlan) + } catch { + case e: AnalysisException => e.toString + } + case ExplainMode.Cost => + qe.stringWithStats + case ExplainMode.Formatted => + qe.simpleString(formatted = true) + } // scalastyle:off println println(outputString) // scalastyle:on println } + /** + * Prints the plans (logical and physical) to the console for debugging purposes. + * + * @group basic + * @since 1.6.0 + */ + def explain(extended: Boolean): Unit = if (extended) { + explain(ExplainMode.Extended) + } else { + explain(ExplainMode.Simple) + } + /** * Prints the physical plan to the console for debugging purposes. * * @group basic * @since 1.6.0 */ - def explain(): Unit = explain(extended = false) + def explain(): Unit = explain(ExplainMode.Simple) /** * Returns all column names and their data types as an array. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index bb641bcb430ea..888c4419458d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.command import java.util.UUID import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -132,13 +131,15 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) * (but do NOT actually execute it). * * {{{ - * EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ... + * EXPLAIN (EXTENDED | CODEGEN | COST | FORMATTED) SELECT * FROM ... * }}} * * @param logicalPlan plan to explain * @param extended whether to do extended explain or not * @param codegen whether to output generated code from whole-stage codegen or not * @param cost whether to show cost information for operators. + * @param formatted whether to split explain output into two sections: a physical plan outline + * and node details. */ case class ExplainCommand( logicalPlan: LogicalPlan, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index f968fbb27d4f4..4e5d0f58be444 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -25,16 +25,19 @@ import org.apache.spark.sql.types.StructType class ExplainSuite extends QueryTest with SharedSparkSession { import testImplicits._ - /** - * Get the explain from a DataFrame and run the specified action on it. - */ - private def withNormalizedExplain(df: DataFrame, extended: Boolean)(f: String => Unit) = { + private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = { val output = new java.io.ByteArrayOutputStream() Console.withOut(output) { - df.explain(extended = extended) + df.explain(mode) } - val normalizedOutput = output.toString.replaceAll("#\\d+", "#x") - f(normalizedOutput) + output.toString.replaceAll("#\\d+", "#x") + } + + /** + * Get the explain from a DataFrame and run the specified action on it. + */ + private def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: String => Unit) = { + f(getNormalizedExplain(df, mode)) } /** @@ -53,14 +56,19 @@ class ExplainSuite extends QueryTest with SharedSparkSession { /** * Runs the plan and makes sure the plans contains all of the keywords. */ - private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { - withNormalizedExplain(df, extended = true) { normalizedOutput => + private def checkKeywordsExistsInExplain( + df: DataFrame, mode: ExplainMode, keywords: String*): Unit = { + withNormalizedExplain(df, mode) { normalizedOutput => for (key <- keywords) { assert(normalizedOutput.contains(key)) } } } + private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { + checkKeywordsExistsInExplain(df, ExplainMode.Extended, keywords: _*) + } + test("SPARK-23034 show rdd names in RDD scan nodes (Dataset)") { val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("testRdd") val df = spark.createDataFrame(rddWithName, StructType.fromDDL("c0 int, c1 string")) @@ -209,7 +217,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession { test("SPARK-26659: explain of DataWritingCommandExec should not contain duplicate cmd.nodeName") { withTable("temptable") { val df = sql("create table temptable using parquet as select * from range(2)") - withNormalizedExplain(df, extended = false) { normalizedOutput => + withNormalizedExplain(df, ExplainMode.Simple) { normalizedOutput => assert("Create\\w*?TableAsSelectCommand".r.findAllMatchIn(normalizedOutput).length == 1) } } @@ -262,6 +270,45 @@ class ExplainSuite extends QueryTest with SharedSparkSession { } } } + + test("Support ExplainMode in Dataset.explain") { + val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1") + val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2") + val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2")) + + val simpleExplainOutput = getNormalizedExplain(testDf, ExplainMode.Simple) + assert(simpleExplainOutput.startsWith("== Physical Plan ==")) + Seq("== Parsed Logical Plan ==", + "== Analyzed Logical Plan ==", + "== Optimized Logical Plan ==").foreach { planType => + assert(!simpleExplainOutput.contains(planType)) + } + checkKeywordsExistsInExplain( + testDf, + ExplainMode.Extended, + "== Parsed Logical Plan ==" :: + "== Analyzed Logical Plan ==" :: + "== Optimized Logical Plan ==" :: + "== Physical Plan ==" :: + Nil: _*) + checkKeywordsExistsInExplain( + testDf, + ExplainMode.Cost, + "Statistics(sizeInBytes=" :: + Nil: _*) + checkKeywordsExistsInExplain( + testDf, + ExplainMode.Codegen, + "WholeStageCodegen subtrees" :: + "Generated code:" :: + Nil: _*) + checkKeywordsExistsInExplain( + testDf, + ExplainMode.Formatted, + "* LocalTableScan (1)" :: + "(1) LocalTableScan [codegen id :" :: + Nil: _*) + } } case class ExplainSingleData(id: Int)