diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 075c73d7a320d..af1e8d75ce437 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -235,11 +235,7 @@ partitionSpecLocation ; partitionSpec - : PARTITION '(' partitionVal (',' partitionVal)* ')' - ; - -partitionVal - : identifier (EQ constant)? + : PARTITION '(' expression (',' expression)* ')' ; describeFuncName diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8faf0eda548ea..9173ea3114942 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -661,6 +661,36 @@ class Analyzer( // rule: ResolveDeserializer. case plan if containsDeserializer(plan.expressions) => plan + // Some commands (AlterTableDropPartitionCommand) use expressions, so we need to resolve. + case c: CommandWithExpression => + val catalogTable = catalog.getTableMetadata(c.getTableName) + + // We should not do this here in order to support general CommandWithExpression + if (catalogTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead") + } + + val table = try { + catalog.lookupRelation(catalogTable.identifier, None) + } catch { + case _: NoSuchTableException => + c.failAnalysis(s"Table or view not found: ${catalogTable.identifier.quotedString}") + } + + c transformExpressionsUp { + case UnresolvedAttribute(nameParts) => + table.resolve(nameParts, resolver) match { + case Some(a) if a.isInstanceOf[AttributeReference] => a + case _ => + throw new AnalysisException( + s"${nameParts.mkString(".")} is not a valid partition column" + + s" in table ${catalogTable.identifier.quotedString}.") + } + case v @ BinaryComparison(left @ AttributeReference(_, dataType, _, _), right: Literal) => + v.makeCopy(Array(left, Cast(right, dataType))) + } + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") q transformExpressionsUp { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7b8badcf8caba..deeed17a704a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -194,10 +194,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitPartitionSpec( ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) { - val parts = ctx.partitionVal.asScala.map { pVal => - val name = pVal.identifier.getText - val value = Option(pVal.constant).map(visitStringConstant) - name -> value + val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { + case UnresolvedAttribute(name :: Nil) => + name -> None + case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) => + name -> Option(constant.toString) + case _ => + throw new ParseException("Invalid partition filter specification", ctx) + } } // Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values // in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for @@ -206,6 +211,23 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { parts.toMap } + /** + * Create a partition filter specification. + */ + def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { + val parts = ctx.expression.asScala.map { pVal => + expression(pVal) match { + case EqualNullSafe(_, _) => + throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) + case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => + cmp + case _ => + throw new ParseException("Invalid partition filter specification", ctx) + } + } + parts.reduceLeft(And) + } + /** * Create a partition specification map without optional values. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala index 38f47081b6f55..78813a50deb10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute /** @@ -27,3 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute trait Command extends LeafNode { override def output: Seq[Attribute] = Seq.empty } + +/** + * A logical node that represents a non-query command with a table to be executed by the system. + * For example, ADD/DROP PARTITION commands supports expressions based on tables. Commands, unlike + * queries, are eagerly executed. + */ +trait CommandWithExpression extends LeafNode { + def getTableName: TableIdentifier + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 4400174e92727..6bd2197057391 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -842,7 +842,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.partitionSpec.asScala.map(visitPartitionFilterSpec), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index c62c14200c24a..ed4976e9c27f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.CommandWithExpression import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -419,29 +420,64 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], + specs: Seq[Expression], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with CommandWithExpression with PredicateHelper { + + override def getTableName: TableIdentifier = tableName + + private def isRangeComparison(expr: Expression): Boolean = { + expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined + } override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) + val normalizedTableName = + TableIdentifier( + tableName.table, + Some(tableName.database.getOrElse(catalog.getCurrentDatabase))) + val table = catalog.getTableMetadata(normalizedTableName) + val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) + specs.foreach { expr => + expr.references.foreach { attr => + if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + + s"in table ${normalizedTableName.quotedString}.") + } + } } - catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, - retainData = retainData) + if (specs.exists(isRangeComparison)) { + val partitionSet = specs.flatMap { spec => + val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec) + if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${spec.sql}") + } + partitions + }.distinct + catalog.dropPartitions( + table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge, + retainData = retainData) + } else { + val normalizedSpecs = specs.map { expr => + val spec = splitConjunctivePredicates(expr).map { + case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString + }.toMap + PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + resolver) + } + catalog.dropPartitions( + table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + retainData = retainData) + } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 4468dc58e404a..42d257d1ed70d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -215,8 +215,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { if (overwrite.enabled) { val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { + import org.apache.spark.sql.catalyst.expressions._ + val expressions = deletedPartitions.map { specs => + specs.map { case (key, value) => + EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType)) + }.reduceLeft(And) + }.toSeq AlterTableDropPartitionCommand( - l.catalogTable.get.identifier, deletedPartitions.toSeq, + l.catalogTable.get.identifier, expressions, ifExists = true, purge = false, retainData = true /* already deleted */).run(t.sparkSession) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 5ef5f8ee77418..5f6396d73d111 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.command import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project @@ -612,8 +614,12 @@ class DDLCommandSuite extends PlanTest { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2009-09-09", "country" -> "uk")), + And( + EqualTo(UnresolvedAttribute("dt"), Literal.create("2008-08-08", StringType)), + EqualTo(UnresolvedAttribute("country"), Literal.create("us", StringType))), + And( + EqualTo(UnresolvedAttribute("dt"), Literal.create("2009-09-09", StringType)), + EqualTo(UnresolvedAttribute("country"), Literal.create("uk", StringType)))), ifExists = true, purge = false, retainData = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f313db641b152..a5ec1dbadb7c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -26,12 +26,14 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { @@ -270,6 +272,132 @@ class HiveDDLSuite } } + test("SPARK-17732: Drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + + for (country <- Seq("US", "CA", "KR")) { + for (quarter <- 1 to 4) { + sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") + } + } + + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')") + sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + + // According to the declarative partition spec definitions, this drops the union of target + // partitions without exceptions. Hive raises exceptions because it handles them sequentially. + sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), Nil) + } + } + + def testAddDropPartition(dataType: DataType, value: Any): Unit = { + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + } + + test("SPARK-18515: AlterTableDropPartitions fails for non-string columns") { + testAddDropPartition(BooleanType, true) + testAddDropPartition(ByteType, 1.toByte) + testAddDropPartition(ShortType, 1.toShort) + testAddDropPartition(IntegerType, 1) + testAddDropPartition(LongType, 1L) + testAddDropPartition(FloatType, 1.0F) + testAddDropPartition(DoubleType, 1.0) + testAddDropPartition(DecimalType(2, 1), Decimal(1.5)) + // TODO: We need to change AlterTableAddPartitions + // testAddDropPartition(DateType, "'2016-11-22'") + // testAddDropPartition(TimestampType, "'2015-08-20 15:57:00'") + testAddDropPartition(StringType, "'abcd'") + } + + test("SPARK-17732: Error handling for drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + + val m = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')") + }.getMessage + assert(m.contains("unknown is not a valid partition column in table")) + + val m2 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')") + }.getMessage + assert(m2.contains("unknown is not a valid partition column in table")) + + val m3 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") + }.getMessage + assert(m3.contains("'<=>' operator is not allowed in partition specification")) + + val m4 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") + }.getMessage + assert(m4.contains("'<=>' operator is not allowed in partition specification")) + + val m5 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)") + }.getMessage + assert(m5.contains("Invalid partition filter specification")) + + sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')") + val m6 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')") + }.getMessage + // The query is not executed because `PARTITION (quarter <= '2')` is invalid. + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + assert(m6.contains("There is no partition for (sales.`quarter` <= '2')")) + } + } + + test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + + val m = intercept[ParseException] { + sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')") + }.getMessage() + assert(m.contains("Invalid partition filter specification")) + } + } + test("drop views") { withTable("tab1") { val tabName = "tab1" @@ -393,7 +521,7 @@ class HiveDDLSuite test("alter views and alter table - misuse") { val tabName = "tab1" withTable(tabName) { - spark.range(10).write.saveAsTable(tabName) + spark.range(10).toDF("a").write.saveAsTable(tabName) val oldViewName = "view1" val newViewName = "view2" withView(oldViewName, newViewName) {