From 224b9798e22c8bb227fbe671ce76adb284c91da3 Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 31 Jul 2020 18:57:11 +0800 Subject: [PATCH 01/11] Disallow empty part col values in partition spec before static partition writing --- .../hive/execution/InsertIntoHiveTable.scala | 17 +++++++++++++++ .../apache/spark/sql/hive/InsertSuite.scala | 21 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 116217ecec0ba..b36343832e652 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -136,6 +137,9 @@ case class InsertIntoHiveTable( case (key, Some(value)) => key -> value case (key, None) => key -> "" } + if (partition.nonEmpty && numDynamicPartitions == 0) { + requireNonEmptyValueInPartitionSpec(Seq(partitionSpec)) + } // All partition column names in the format of "//..." val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") @@ -341,4 +345,17 @@ case class InsertIntoHiveTable( isSrcLocal = false) } } + + /** + * Verify if the input partition spec has any empty value. + */ + protected def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = { + specs.foreach { s => + if (s.values.exists(_.isEmpty)) { + val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + throw new AnalysisException( + s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 421dcb499bd6a..c878e0e7e3fae 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -847,4 +847,25 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } } + + test("SPARK-32508 Disallow empty part col values in partition spec before static partition writing") { + withTable("t1") { + spark.sql( + """ + |CREATE TABLE t1 (c1 int) + |PARTITIONED BY (d string) + """.stripMargin) + + val e = intercept[AnalysisException] { + spark.sql( + """ + |INSERT OVERWRITE TABLE t1 PARTITION(d='') + |SELECT 1 + """.stripMargin) + }.getMessage + + assert(!e.contains("get partition: Value for key d is null or empty")) + assert(e.contains("Partition spec is invalid")) + } + } } From 221564b1b879cf9e1bf7833a826756bf7d3a7fdd Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 31 Jul 2020 20:18:57 +0800 Subject: [PATCH 02/11] code style --- .../src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index c878e0e7e3fae..ebc6cfb77d355 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -848,7 +848,8 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } - test("SPARK-32508 Disallow empty part col values in partition spec before static partition writing") { + test("SPARK-32508 " + + "Disallow empty part col values in partition spec before static partition writing") { withTable("t1") { spark.sql( """ From e012e7a36f3cbf756208a7544fe2e5c3978afde8 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 8 Sep 2020 22:53:10 +0800 Subject: [PATCH 03/11] move to analyzer rule --- .../spark/sql/execution/datasources/rules.scala | 10 ++++++++++ .../sql/hive/execution/InsertIntoHiveTable.scala | 16 ---------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 60cacda9f5f1c..50052bee04d44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -402,6 +402,16 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } + // check static partition + if (normalizedPartSpec.nonEmpty && staticPartCols.size == partColNames.size) { + // empty partition column value + if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) { + val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + throw new AnalysisException( + s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") + } + } + val newQuery = TableOutputResolver.resolveOutputColumns( tblName, expectedColumns, insert.query, byName = false, conf) if (normalizedPartSpec.nonEmpty) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b36343832e652..ae400ace48ffe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -137,9 +137,6 @@ case class InsertIntoHiveTable( case (key, Some(value)) => key -> value case (key, None) => key -> "" } - if (partition.nonEmpty && numDynamicPartitions == 0) { - requireNonEmptyValueInPartitionSpec(Seq(partitionSpec)) - } // All partition column names in the format of "//..." val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") @@ -345,17 +342,4 @@ case class InsertIntoHiveTable( isSrcLocal = false) } } - - /** - * Verify if the input partition spec has any empty value. - */ - protected def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = { - specs.foreach { s => - if (s.values.exists(_.isEmpty)) { - val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") - throw new AnalysisException( - s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") - } - } - } } From ab4e04c8a418219345ce374b83160cb55013b712 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 8 Sep 2020 22:55:34 +0800 Subject: [PATCH 04/11] remove unused import --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ae400ace48ffe..116217ecec0ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap From 232a8358f3c22e14afc419f11b60047d9a1a3882 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 9 Sep 2020 16:35:45 +0800 Subject: [PATCH 05/11] check when tracksPartitionsInCatalog==true --- .../sql/execution/datasources/rules.scala | 18 ++++++++++----- .../spark/sql/sources/InsertSuite.scala | 22 +++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 50052bee04d44..e02629efbb75b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -386,7 +386,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { private def preprocess( insert: InsertIntoStatement, tblName: String, - partColNames: Seq[String]): InsertIntoStatement = { + partColNames: Seq[String], + catalogTable: Option[CatalogTable]): InsertIntoStatement = { val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec( insert.partitionSpec, partColNames, tblName, conf.resolver) @@ -402,8 +403,14 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } + val partitionsTrackedByCatalog = conf.manageFilesourcePartitions && + catalogTable.isDefined && + catalogTable.get.partitionColumnNames.nonEmpty && + catalogTable.get.tracksPartitionsInCatalog // check static partition - if (normalizedPartSpec.nonEmpty && staticPartCols.size == partColNames.size) { + if (partitionsTrackedByCatalog && + normalizedPartSpec.nonEmpty && + staticPartCols.size == partColNames.size) { // empty partition column value if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) { val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") @@ -437,13 +444,14 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { table match { case relation: HiveTableRelation => val metadata = relation.tableMeta - preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) + preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames, + Some(metadata)) case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") - preprocess(i, tblName, h.partitionSchema.map(_.name)) + preprocess(i, tblName, h.partitionSchema.map(_.name), catalogTable) case LogicalRelation(_: InsertableRelation, _, catalogTable, _) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") - preprocess(i, tblName, Nil) + preprocess(i, tblName, Nil, catalogTable) case _ => i } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index abd33ab8a8f22..b819383804c68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -866,6 +866,28 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { }.getMessage assert(message.contains("LOCAL is supported only with file: scheme")) } + + test("SPARK-32508 " + + "Disallow empty part col values in partition spec before static partition writing") { + withTable("insertTable") { + sql( + """ + |CREATE TABLE insertTable(i int, part1 string, part2 string) USING PARQUET + |PARTITIONED BY (part1, part2) + """.stripMargin) + + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { + val e = intercept[AnalysisException] { + sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") + }.getMessage + assert(e.contains("Partition spec is invalid")) + } + + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") { + sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") + } + } + } } class FileExistingTestFileSystem extends RawLocalFileSystem { From a7b9a179556a1c50a67726c07268692211ea98b3 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 10 Sep 2020 23:07:29 +0800 Subject: [PATCH 06/11] modify the check logic --- .../spark/sql/execution/datasources/rules.scala | 4 +--- .../org/apache/spark/sql/sources/InsertSuite.scala | 13 ++++++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index e02629efbb75b..0fd9e68c93376 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -407,10 +407,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { catalogTable.isDefined && catalogTable.get.partitionColumnNames.nonEmpty && catalogTable.get.tracksPartitionsInCatalog - // check static partition if (partitionsTrackedByCatalog && - normalizedPartSpec.nonEmpty && - staticPartCols.size == partColNames.size) { + normalizedPartSpec.nonEmpty) { // empty partition column value if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) { val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index b819383804c68..a97d7b19df9aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -877,10 +877,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { """.stripMargin) withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { - val e = intercept[AnalysisException] { + val msg = "Partition spec is invalid" + assert(intercept[AnalysisException] { sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") - }.getMessage - assert(e.contains("Partition spec is invalid")) + }.getMessage.contains(msg)) + assert(intercept[AnalysisException] { + sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2") + }.getMessage.contains(msg)) + + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1") + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2") + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2") } withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") { From 65f781aa19504c3ffbee80ce1e8d39e22a737b21 Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 11 Sep 2020 12:09:19 +0800 Subject: [PATCH 07/11] respect tracksPartitionsInCatalog --- .../org/apache/spark/sql/execution/datasources/rules.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 0fd9e68c93376..edd2bac10222e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -403,8 +403,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } - val partitionsTrackedByCatalog = conf.manageFilesourcePartitions && - catalogTable.isDefined && + val partitionsTrackedByCatalog = catalogTable.isDefined && catalogTable.get.partitionColumnNames.nonEmpty && catalogTable.get.tracksPartitionsInCatalog if (partitionsTrackedByCatalog && From 965ed5ad541406e50a40c62658f3962df69e4444 Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 11 Sep 2020 15:29:39 +0800 Subject: [PATCH 08/11] fix ut --- .../spark/sql/sources/InsertSuite.scala | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index a97d7b19df9aa..32c4fb60b8c54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -875,24 +875,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { |CREATE TABLE insertTable(i int, part1 string, part2 string) USING PARQUET |PARTITIONED BY (part1, part2) """.stripMargin) - - withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { - val msg = "Partition spec is invalid" - assert(intercept[AnalysisException] { - sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") - }.getMessage.contains(msg)) - assert(intercept[AnalysisException] { - sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2") - }.getMessage.contains(msg)) - - sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1") - sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2") - sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2") - } - - withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") { + val msg = "Partition spec is invalid" + assert(intercept[AnalysisException] { sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") - } + }.getMessage.contains(msg)) + assert(intercept[AnalysisException] { + sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2") + }.getMessage.contains(msg)) + + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1") + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2") + sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2") } } } From b969c734a593d4252a5f36034d659f3c0d86d1e0 Mon Sep 17 00:00:00 2001 From: sychen Date: Tue, 15 Sep 2020 23:10:10 +0800 Subject: [PATCH 09/11] nit --- .../org/apache/spark/sql/execution/datasources/rules.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index edd2bac10222e..5fb1a4d249070 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -406,8 +406,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { val partitionsTrackedByCatalog = catalogTable.isDefined && catalogTable.get.partitionColumnNames.nonEmpty && catalogTable.get.tracksPartitionsInCatalog - if (partitionsTrackedByCatalog && - normalizedPartSpec.nonEmpty) { + if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) { // empty partition column value if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) { val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") From c8ac36917caf7fa67a0bca3c30a10380fd7c7306 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 16 Sep 2020 10:36:53 +0800 Subject: [PATCH 10/11] trigger test From 76acc0751b08fe2112b3538de0e25b57e67dc050 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 16 Sep 2020 16:07:04 +0800 Subject: [PATCH 11/11] trigger test