From 3e7beb84555057c85582a19e804626871bd16016 Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Wed, 21 Sep 2016 10:44:12 -0700 Subject: [PATCH 1/2] set expectedOutputAttributes --- .../datasources/DataSourceStrategy.scala | 10 +++++++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 16 +++++++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c8ad5b303491..63f01c5bb9e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { * source information. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { + private def readDataSourceTable( + sparkSession: SparkSession, + simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = { + val table = simpleCatalogRelation.catalogTable val dataSource = DataSource( sparkSession, @@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation( dataSource.resolveRelation(), + expectedOutputAttributes = Some(simpleCatalogRelation.output), catalogTable = Some(table)) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) if DDLUtils.isDatasourceTable(s.metadata) => - i.copy(table = readDataSourceTable(sparkSession, s.metadata)) + i.copy(table = readDataSourceTable(sparkSession, s)) case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - readDataSourceTable(sparkSession, s.metadata) + readDataSourceTable(sparkSession, s) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0ee8c959eeb4..62a084d604ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -22,9 +22,7 @@ import java.math.MathContext import java.sql.{Date, Timestamp} import org.apache.spark.{AccumulatorSuite, SparkException} -import org.apache.spark.sql.catalyst.analysis.UnresolvedException -import org.apache.spark.sql.catalyst.expressions.SortOrder -import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} @@ -2310,6 +2308,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("data source table created in InMemoryCatalog should guarantee resolving consistency") { + val table = "tbl" + withTable("tbl") { + sql("CREATE TABLE tbl(i INT, j STRING) USING parquet") + val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(table) + val relation = spark.sessionState.catalog.lookupRelation(tableIdent) + val expr = relation.resolve("i") + val plan = Dataset.ofRows(spark, Project(Seq(expr), relation)) + plan.queryExecution.assertAnalyzed() + } + } + test("Eliminate noop ordinal ORDER BY") { withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "true") { val plan1 = sql("SELECT 1.0, 'abc', year(current_date()) ORDER BY 1, 2, 3") From e2c3b9df0431885efbc9575beb7735590a77cf2f Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Wed, 21 Sep 2016 21:03:22 -0700 Subject: [PATCH 2/2] fix test case --- .../org/apache/spark/sql/DataFrameSuite.scala | 14 +++++++++++++- .../org/apache/spark/sql/SQLQuerySuite.scala | 16 +++------------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c2d256bdd335..2c60a7dd9209 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -26,7 +26,8 @@ import scala.util.Random import org.scalatest.Matchers._ import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, Union} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} @@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect assert(d.size == d.distinct.size) } + + test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") { + val tableName = "tbl" + withTable(tableName) { + spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName) + val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)) + val expr = relation.resolve("i") + val qe = spark.sessionState.executePlan(Project(Seq(expr), relation)) + qe.assertAnalyzed() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 62a084d604ce..0ee8c959eeb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -22,7 +22,9 @@ import java.math.MathContext import java.sql.{Date, Timestamp} import org.apache.spark.{AccumulatorSuite, SparkException} -import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} @@ -2308,18 +2310,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("data source table created in InMemoryCatalog should guarantee resolving consistency") { - val table = "tbl" - withTable("tbl") { - sql("CREATE TABLE tbl(i INT, j STRING) USING parquet") - val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(table) - val relation = spark.sessionState.catalog.lookupRelation(tableIdent) - val expr = relation.resolve("i") - val plan = Dataset.ofRows(spark, Project(Seq(expr), relation)) - plan.queryExecution.assertAnalyzed() - } - } - test("Eliminate noop ordinal ORDER BY") { withSQLConf(SQLConf.ORDER_BY_ORDINAL.key -> "true") { val plan1 = sql("SELECT 1.0, 'abc', year(current_date()) ORDER BY 1, 2, 3")