From 81413e08e2f347786cde71dcfcbf6c590af0d80b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 24 Jun 2016 00:08:06 -0700 Subject: [PATCH 1/7] fix --- .../sql/catalyst/analysis/CheckAnalysis.scala | 4 +++ .../sql/execution/command/DDLSuite.scala | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 899227674f2ac..fd5ff9f9fa832 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -339,6 +339,10 @@ trait CheckAnalysis extends PredicateHelper { s"$numStaticPartitions partition column(s) having constant value(s).") } + case c if c.getClass.getName == + "org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan" => + failAnalysis("Hive support is required to use CREATE TABLE AS SELECT") + case o if !o.resolved => failAnalysis( s"unresolved operator ${operator.simpleString}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 47d8a28f49927..3c5ac3dc76f2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1270,6 +1270,37 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')") } + test("Create Cataloged Table As Select") { + import testImplicits._ + withTable("t", "t1") { + var e = intercept[AnalysisException] { + sql("CREATE TABLE t SELECT 1 as a, 1 as b") + }.getMessage + assert(e.contains("Hive support is required to use CREATE TABLE AS SELECT")) + + spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") + e = intercept[AnalysisException] { + sql("CREATE TABLE t SELECT a, b from t1") + }.getMessage + assert(e.contains("Hive support is required to use CREATE TABLE AS SELECT")) + + sql(s"CREATE TABLE t USING parquet SELECT a, b from t1") + + } + } + + test("Create Data Source Table As Select") { + import testImplicits._ + withTable("t", "t1", "t2") { + sql("CREATE TABLE t USING parquet SELECT 1 as a, 1 as b") + checkAnswer(spark.table("t"), Row(1, 1) :: Nil) + + spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") + sql("CREATE TABLE t2 USING parquet SELECT a, b from t1") + checkAnswer(spark.table("t2"), spark.table("t1")) + } + } + test("drop default database") { Seq("true", "false").foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { From 8947445ce9ea81e0d91814dfd4aa5eb2bb014b85 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 24 Jun 2016 00:13:09 -0700 Subject: [PATCH 2/7] clean test case --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3c5ac3dc76f2f..cb98d8e73329c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1283,9 +1283,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("CREATE TABLE t SELECT a, b from t1") }.getMessage assert(e.contains("Hive support is required to use CREATE TABLE AS SELECT")) - - sql(s"CREATE TABLE t USING parquet SELECT a, b from t1") - } } From db3d44c382006516e841651f16075cfb83a86e1a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 24 Jun 2016 00:21:41 -0700 Subject: [PATCH 3/7] Better Error message. --- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fd5ff9f9fa832..f1000017c1b89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -341,7 +341,7 @@ trait CheckAnalysis extends PredicateHelper { case c if c.getClass.getName == "org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan" => - failAnalysis("Hive support is required to use CREATE TABLE AS SELECT") + failAnalysis("Hive support is required to use CREATE Hive TABLE AS SELECT") case o if !o.resolved => failAnalysis( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index cb98d8e73329c..f35f8f97c9251 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1282,7 +1282,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { e = intercept[AnalysisException] { sql("CREATE TABLE t SELECT a, b from t1") }.getMessage - assert(e.contains("Hive support is required to use CREATE TABLE AS SELECT")) + assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT")) } } From e4cc35d88e87941d8b9d2e3a2f754a43d29d4c70 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 24 Jun 2016 09:16:14 -0700 Subject: [PATCH 4/7] fix test case --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f35f8f97c9251..95f0fc60d2b33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1276,7 +1276,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { var e = intercept[AnalysisException] { sql("CREATE TABLE t SELECT 1 as a, 1 as b") }.getMessage - assert(e.contains("Hive support is required to use CREATE TABLE AS SELECT")) + assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT")) spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") e = intercept[AnalysisException] { From 8a4d2b2128bec1526228156734e1d1b94bd01c7b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 8 Aug 2016 08:23:03 -0700 Subject: [PATCH 5/7] address comments --- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 4 ---- .../spark/sql/execution/datasources/rules.scala | 12 ++++++++++-- .../org/apache/spark/sql/internal/SessionState.scala | 3 ++- .../org/apache/spark/sql/hive/HiveSessionState.scala | 3 ++- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e002293980c93..41b7e62d8ccea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -375,10 +375,6 @@ trait CheckAnalysis extends PredicateHelper { s"$numStaticPartitions partition column(s) having constant value(s).") } - case c if c.getClass.getName == - "org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan" => - failAnalysis("Hive support is required to use CREATE Hive TABLE AS SELECT") - case o if !o.resolved => failAnalysis( s"unresolved operator ${operator.simpleString}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index d5b92323d4418..d9ef757112b9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -21,6 +21,8 @@ import java.util.regex.Pattern import scala.util.control.NonFatal +import org.apache.spark.SparkConf +import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis._ @@ -275,7 +277,10 @@ private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[Log /** * A rule to do various checks before inserting into or writing to a data source table. */ -private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) +private[sql] case class PreWriteCheck( + sqlConf: SQLConf, + catalog: SessionCatalog, + sparkConf: SparkConf) extends (LogicalPlan => Unit) { def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } @@ -285,6 +290,9 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) def apply(plan: LogicalPlan): Unit = { plan.foreach { + case CreateTable(tableDesc, _, Some(_)) + if tableDesc.provider.get == "hive" && sparkConf.get(CATALOG_IMPLEMENTATION) != "hive" => + failAnalysis("Hive support is required to use CREATE Hive TABLE AS SELECT") case c @ CreateTable(tableDesc, mode, query) if c.resolved => // Since we are saving table metadata to metastore, we should make sure the table name // and database name don't break some common restrictions, e.g. special chars except @@ -334,7 +342,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } PartitioningUtils.validatePartitionColumn( - r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) + r.schema, part.keySet.toSeq, sqlConf.caseSensitiveAnalysis) // Get all input data source relations of the query. val srcRelations = query.collect { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 052bce0923695..3fb2857ea5d25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -117,7 +117,8 @@ private[sql] class SessionState(sparkSession: SparkSession) { DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) - override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog)) + override val extendedCheckRules = + Seq(PreWriteCheck(conf, catalog, sparkSession.sparkContext.conf)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index e01c053ab5a76..d097a7e267ecc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -70,7 +70,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) - override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) + override val extendedCheckRules = + Seq(PreWriteCheck(conf, catalog, sparkSession.sparkContext.conf)) } } From 79c21b6f2607bdd63e1baad853c67e54aa992b3d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 9 Aug 2016 20:18:55 -0700 Subject: [PATCH 6/7] address comments --- .../sql/execution/datasources/rules.scala | 18 ++++++++++++++++++ .../spark/sql/internal/SessionState.scala | 3 ++- .../spark/sql/hive/HiveSessionState.scala | 3 +-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index c133dda13e3fa..43aa3380d84b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -272,6 +272,24 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } } +/** + * A rule to check whether the functions are supported only when Hive support is enabled + */ +object HiveOnlyCheck extends (LogicalPlan => Unit) { + def apply(plan: LogicalPlan): Unit = { + + def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } + + plan.foreach { + case CreateTable(tableDesc, _, Some(_)) + if tableDesc.provider.get == "hive" => + failAnalysis("Hive support is required to use CREATE Hive TABLE AS SELECT") + + case _ => // OK + } + } +} + /** * A rule to do various checks before inserting into or writing to a data source table. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 3fb2857ea5d25..65d9179f307e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -118,7 +118,8 @@ private[sql] class SessionState(sparkSession: SparkSession) { (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) override val extendedCheckRules = - Seq(PreWriteCheck(conf, catalog, sparkSession.sparkContext.conf)) + Seq(PreWriteCheck(conf, catalog), + HiveOnlyCheck) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d097a7e267ecc..e01c053ab5a76 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -70,8 +70,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) - override val extendedCheckRules = - Seq(PreWriteCheck(conf, catalog, sparkSession.sparkContext.conf)) + override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) } } From 037388ad2e42e744edda29a1bf5ab7bf90d566fb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 9 Aug 2016 23:51:45 -0700 Subject: [PATCH 7/7] address comments --- .../org/apache/spark/sql/execution/datasources/rules.scala | 5 +---- .../scala/org/apache/spark/sql/internal/SessionState.scala | 3 +-- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 43aa3380d84b2..fc8d8c3667901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -277,13 +277,10 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { */ object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { - - def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } - plan.foreach { case CreateTable(tableDesc, _, Some(_)) if tableDesc.provider.get == "hive" => - failAnalysis("Hive support is required to use CREATE Hive TABLE AS SELECT") + throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT") case _ => // OK } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 65d9179f307e9..ab27381c0600d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -118,8 +118,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) override val extendedCheckRules = - Seq(PreWriteCheck(conf, catalog), - HiveOnlyCheck) + Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0b8bbeec6c86b..0eb3f2002d0bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1578,7 +1578,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')") } - test("Create Cataloged Table As Select") { + test("Create Hive Table As Select") { import testImplicits._ withTable("t", "t1") { var e = intercept[AnalysisException] {