diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 2a2315896831..59ffd1638111 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import com.fasterxml.jackson.annotation.JsonIgnoreProperties - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo @@ -28,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetricInfo * Stores information about a SQL SparkPlan. */ @DeveloperApi -@JsonIgnoreProperties(Array("metadata")) // The metadata field was removed in Spark 2.3. class SparkPlanInfo( val nodeName: String, val simpleString: String, val children: Seq[SparkPlanInfo], + val metadata: Map[String, String], val metrics: Seq[SQLMetricInfo]) { override def hashCode(): Int = { @@ -59,6 +57,12 @@ private[execution] object SparkPlanInfo { new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } - new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics) + // dump the file scan metadata (e.g file path) to event log + val metadata = plan match { + case fileScan: FileSourceScanExec => fileScan.metadata + case _ => Map[String, String]() + } + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), + metadata, metrics) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index c2e62b987e0c..08e40e28d3d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -46,7 +46,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite { """.stripMargin val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString)) val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", - new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0) + new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) assert(reconstructedEvent == expectedEvent) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 34dc6f37c0e4..47ff372992b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -50,4 +50,12 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") { + withTempPath { path => + spark.range(5).write.parquet(path.getAbsolutePath) + val f = spark.read.parquet(path.getAbsolutePath) + assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty) + } + } }