From c5edbdfc3a9b11fce8b427488f106158133e40a0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 4 Aug 2016 15:12:37 -0700 Subject: [PATCH 1/5] support ddl: MSCK REPAIR TABLE --- .../spark/sql/catalyst/parser/SqlBase.g4 | 6 +- .../spark/sql/execution/SparkSqlParser.scala | 29 ++++++ .../spark/sql/execution/command/ddl.scala | 98 ++++++++++++++++++- .../spark/sql/execution/command/tables.scala | 42 +++++++- .../execution/command/DDLCommandSuite.scala | 8 ++ .../sql/execution/command/DDLSuite.scala | 39 ++++++++ .../execution/HiveCompatibilitySuite.scala | 2 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 9 +- 8 files changed, 224 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 5e1046293a206..8341f69e82f19 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -84,6 +84,7 @@ statement | ALTER VIEW tableIdentifier DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation + | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable | DROP VIEW (IF EXISTS)? tableIdentifier #dropTable | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier @@ -121,6 +122,7 @@ statement | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE tableIdentifier partitionSpec? #loadData | TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable + | MSCK REPAIR TABLE tableIdentifier #repairTable | op=(ADD | LIST) identifier .*? #manageResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration @@ -154,7 +156,6 @@ unsupportedHiveNativeCommands | kw1=UNLOCK kw2=DATABASE | kw1=CREATE kw2=TEMPORARY kw3=MACRO | kw1=DROP kw2=TEMPORARY kw3=MACRO - | kw1=MSCK kw2=REPAIR kw3=TABLE | kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED | kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY | kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED @@ -652,7 +653,7 @@ nonReserved | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT | DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER - | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE + | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH | ASC | DESC | LIMIT | RENAME | SETS | AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE @@ -865,6 +866,7 @@ LOCK: 'LOCK'; UNLOCK: 'UNLOCK'; MSCK: 'MSCK'; REPAIR: 'REPAIR'; +RECOVER: 'RECOVER'; EXPORT: 'EXPORT'; IMPORT: 'IMPORT'; LOAD: 'LOAD'; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 22b1e07219417..d9c32c9e54980 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -408,6 +408,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } + /** + * Create a [[RepairTableCommand]] command. + * + * For example: + * {{{ + * MSCK REPAIR TABLE tablename + * }}} + */ + override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) { + RepairTableCommand(visitTableIdentifier(ctx.tableIdentifier)) + } + /** * Convert a table property list into a key-value map. * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. @@ -778,6 +790,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx.PURGE != null) } + /** + * Create an [[AlterTableDiscoverPartitionsCommand]] command + * + * For example: + * {{{ + * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]; + * }}} + * + * ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning + * is associated with physical tables + */ + override def visitRecoverPartitions( + ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) { + AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier)) + } + /** * Create an [[AlterTableSetLocationCommand]] command * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f0e49e65c459d..2ca8343414f0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,13 +17,19 @@ package org.apache.spark.sql.execution.command +import java.io.File + +import scala.collection.GenSeq +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool import scala.util.control.NonFatal +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable} -import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.types._ @@ -425,6 +431,92 @@ case class AlterTableDropPartitionCommand( } +/** + * Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and + * update the catalog. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table DISCOVER PARTITIONS; + * }}} + */ +case class AlterTableRecoverPartitionsCommand( + tableName: TableIdentifier) extends RunnableCommand { + override def run(spark: SparkSession): Seq[Row] = { + val catalog = spark.sessionState.catalog + if (!catalog.tableExists(tableName)) { + throw new AnalysisException( + s"Table $tableName in ALTER TABLE RECOVER PARTITIONS does not exist.") + } + val table = catalog.getTableMetadata(tableName) + if (catalog.isTemporaryTable(tableName)) { + throw new AnalysisException( + s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName") + } + if (table.tableType != CatalogTableType.EXTERNAL) { + throw new AnalysisException( + s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " + + s"tables: $tableName") + } + if (table.partitionColumnNames.isEmpty) { + throw new AnalysisException( + s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned " + + s"tables: $tableName") + } + if (table.storage.locationUri.isEmpty) { + throw new AnalysisException( + s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on tables with " + + s"location provided: $tableName") + } + + recoverPartitions(spark, table) + Seq.empty[Row] + } + + def recoverPartitions(spark: SparkSession, table: CatalogTable): Unit = { + val root = new Path(table.storage.locationUri.get) + val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + val partitionSpecsAndLocs = scanPartitions(spark, fs, root, Map(), table.partitionSchema.size) + val parts = partitionSpecsAndLocs.map { case (spec, location) => + // inherit table storage format (possibly except for location) + CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString))) + } + spark.sessionState.catalog.createPartitions(tableName, + parts.toArray[CatalogTablePartition], ignoreIfExists = true) + } + + @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) + + private def scanPartitions( + spark: SparkSession, + fs: FileSystem, + path: Path, + spec: TablePartitionSpec, + numPartitionsLeft: Int): GenSeq[(TablePartitionSpec, Path)] = { + if (numPartitionsLeft == 0) { + return Seq(spec -> path) + } + + val statuses = fs.listStatus(path) + val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt + val statusPar: GenSeq[FileStatus] = + if (numPartitionsLeft > 1 && statuses.length > threshold || numPartitionsLeft > 2) { + val parArray = statuses.par + parArray.tasksupport = evalTaskSupport + parArray + } else { + statuses + } + statusPar.flatMap { st => + val ps = st.getPath.getName.split("=", 2) + if (ps.length != 2) { + throw new AnalysisException(s"Invalid partition path: ${st.getPath}") + } + scanPartitions(spark, fs, st.getPath, spec ++ Map(ps(0) -> ps(1)), numPartitionsLeft - 1) + } + } +} + /** * A command that sets the location of a table or a partition. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index e6fe9a73a1f30..04a56d95e0b24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.{PartitioningUtils} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -388,6 +388,46 @@ case class TruncateTableCommand( } } +/** + * A command to repair a table by discovery all the partitions in the directory. + * + * The syntax of this command is: + * {{{ + * MSCK REPAIR TABLE table_name; + * }}} + * + * This command is the same as AlterTableRecoverPartitions + */ +case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand { + override def run(spark: SparkSession): Seq[Row] = { + val catalog = spark.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + if (!catalog.tableExists(tableName)) { + throw new AnalysisException(s"Table $tableName in MSCK REPAIR TABLE does not exist.") + } + if (catalog.isTemporaryTable(tableName)) { + throw new AnalysisException( + s"Operation not allowed: MSCK REPAIR TABLE on temporary tables: $tableName") + } + if (table.tableType != CatalogTableType.EXTERNAL) { + throw new AnalysisException( + s"Operation not allowed: MSCK REPAIR TABLE only works on external tables: $tableName") + } + if (table.partitionColumnNames.isEmpty) { + throw new AnalysisException( + s"Operation not allowed: MSCK REPAIR TABLE only works on partitioned tables: $tableName") + } + if (table.storage.locationUri.isEmpty) { + throw new AnalysisException( + s"Operation not allowed: MSCK REPAIR TABLE only works on tables with location provided: " + + s"$tableName") + } + + AlterTableRecoverPartitionsCommand(tableName).recoverPartitions(spark, table) + Seq.empty[Row] + } +} + /** * Command that looks like * {{{ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 999afc9751fe1..9f8502e8381f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -563,6 +563,14 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2, expected2) } + test("alter table: recover partitions") { + val sql = "ALTER TABLE table_name RECOVER PARTITIONS" + val parsed = parser.parsePlan(sql) + val expected = AlterTableRecoverPartitionsCommand( + TableIdentifier("table_name", None)) + comparePlans(parsed, expected) + } + test("alter view: add partition (not supported)") { assertUnsupported( """ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 564fc73ee702e..0eea6247ce6a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -827,6 +827,45 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { testAddPartitions(isDatasourceTable = true) } + test("alter table: recover partitions (sequential)") { + withSQLConf("spark.rdd.parallelListingThreshold" -> "1") { + testRecoverPartitions() + } + } + + test("after table: recover partition (parallel)") { + withSQLConf("spark.rdd.parallelListingThreshold" -> "10") { + testRecoverPartitions() + } + } + + private def testRecoverPartitions() { + val catalog = spark.sessionState.catalog + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist RECOVER PARTITIONS") + } + + val tableIdent = TableIdentifier("tab1") + createTable(catalog, tableIdent) + val part1 = Map("a" -> "1", "b" -> "5") + createTablePartition(catalog, part1, tableIdent) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) + + val part2 = Map("a" -> "2", "b" -> "6") + val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get) + val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) + fs.mkdirs(new Path(new Path(root, "a=2"), "b=6")) + try { + sql("ALTER TABLE tab1 RECOVER PARTITIONS") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(part1, part2)) + } finally { + fs.delete(root, true) + } + } + test("alter table: add partition is not supported for views") { assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')") } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 13d18fdec0e9d..16519a6e2bdd5 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -188,7 +188,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "input42", "input_dfs", "metadata_export_drop", - "repair", // Uses a serde that isn't on the classpath... breaks other tests. "bucketizedhiveinputformat", @@ -937,6 +936,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "reduce_deduplicate_exclude_gby", "reduce_deduplicate_exclude_join", "reduce_deduplicate_extended", + "repair", "router_join_ppr", "select_as_omitted", "select_unquote_and", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index e0c07db3b0a9e..bddd7b53cc98b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -499,8 +500,12 @@ class HiveDDLCommandSuite extends PlanTest { } } - test("MSCK repair table (not supported)") { - assertUnsupported("MSCK REPAIR TABLE tab1") + test("MSCK REPAIR table") { + val sql = "MSCK REPAIR TABLE tab1" + val parsed = parser.parsePlan(sql) + val expected = RepairTableCommand( + TableIdentifier("tab1", None)) + comparePlans(parsed, expected) } test("create table like") { From 89d22f4d2640a3f8bcd0f25a9eb84aaa9df73e48 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 4 Aug 2016 15:33:48 -0700 Subject: [PATCH 2/5] disallow on datasource table --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 6 +++++- .../org/apache/spark/sql/execution/command/tables.scala | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 2ca8343414f0e..26e438a1306f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -453,12 +453,16 @@ case class AlterTableRecoverPartitionsCommand( throw new AnalysisException( s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName") } + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on datasource tables: $tableName") + } if (table.tableType != CatalogTableType.EXTERNAL) { throw new AnalysisException( s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " + s"tables: $tableName") } - if (table.partitionColumnNames.isEmpty) { + if (DDLUtils.isTablePartitioned(table)) { throw new AnalysisException( s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned " + s"tables: $tableName") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 04a56d95e0b24..f2af3e90086be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -409,11 +409,15 @@ case class RepairTableCommand(tableName: TableIdentifier) extends RunnableComman throw new AnalysisException( s"Operation not allowed: MSCK REPAIR TABLE on temporary tables: $tableName") } + if (DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException( + s"Operation not allowed: MSCK REPAIR TABLE on datasource tables: $tableName") + } if (table.tableType != CatalogTableType.EXTERNAL) { throw new AnalysisException( s"Operation not allowed: MSCK REPAIR TABLE only works on external tables: $tableName") } - if (table.partitionColumnNames.isEmpty) { + if (DDLUtils.isTablePartitioned(table)) { throw new AnalysisException( s"Operation not allowed: MSCK REPAIR TABLE only works on partitioned tables: $tableName") } From f3385166ff57143ef8b3cfac82aba7a8e958d6eb Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 4 Aug 2016 16:05:26 -0700 Subject: [PATCH 3/5] do not use repair.q --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 16519a6e2bdd5..13d18fdec0e9d 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -188,6 +188,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "input42", "input_dfs", "metadata_export_drop", + "repair", // Uses a serde that isn't on the classpath... breaks other tests. "bucketizedhiveinputformat", @@ -936,7 +937,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "reduce_deduplicate_exclude_gby", "reduce_deduplicate_exclude_join", "reduce_deduplicate_extended", - "repair", "router_join_ppr", "select_as_omitted", "select_unquote_and", From 7f4f38d0051c0ad25bcd8406a18f6ef0b759b0ef Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 5 Aug 2016 12:13:32 -0700 Subject: [PATCH 4/5] address comments --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../spark/sql/execution/SparkSqlParser.scala | 12 ++- .../spark/sql/execution/command/ddl.scala | 75 +++++++++++-------- .../spark/sql/execution/command/tables.scala | 44 ----------- .../sql/execution/command/DDLSuite.scala | 12 ++- .../spark/sql/hive/HiveDDLCommandSuite.scala | 5 +- 6 files changed, 64 insertions(+), 86 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 8341f69e82f19..3549b0047fae7 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -122,7 +122,7 @@ statement | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE tableIdentifier partitionSpec? #loadData | TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable - | MSCK REPAIR TABLE tableIdentifier #repairTable + | MSCK REPAIR TABLE tableIdentifier #repairTable | op=(ADD | LIST) identifier .*? #manageResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d9c32c9e54980..bf2e5a3badf8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -409,7 +409,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a [[RepairTableCommand]] command. + * Create a [[AlterTableRecoverPartitionsCommand]] command. * * For example: * {{{ @@ -417,7 +417,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * }}} */ override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) { - RepairTableCommand(visitTableIdentifier(ctx.tableIdentifier)) + AlterTableRecoverPartitionsCommand( + visitTableIdentifier(ctx.tableIdentifier), + "MSCK REPAIR TABLE") } /** @@ -795,12 +797,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * * For example: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; - * ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]; + * ALTER TABLE table RECOVER PARTITIONS; * }}} - * - * ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning - * is associated with physical tables */ override def visitRecoverPartitions( ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 26e438a1306f7..73c63a06828ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.execution.command -import java.io.File - import scala.collection.GenSeq import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -32,9 +31,9 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal import org.apache.spark.sql.catalyst.catalog.CatalogTypes._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ - // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -432,61 +431,60 @@ case class AlterTableDropPartitionCommand( } /** - * Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and * update the catalog. * * The syntax of this command is: * {{{ - * ALTER TABLE table DISCOVER PARTITIONS; + * ALTER TABLE table RECOVER PARTITIONS; + * MSCK REPAIR TABLE table; * }}} */ case class AlterTableRecoverPartitionsCommand( - tableName: TableIdentifier) extends RunnableCommand { + tableName: TableIdentifier, + cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { override def run(spark: SparkSession): Seq[Row] = { val catalog = spark.sessionState.catalog if (!catalog.tableExists(tableName)) { - throw new AnalysisException( - s"Table $tableName in ALTER TABLE RECOVER PARTITIONS does not exist.") + throw new AnalysisException(s"Table $tableName in $cmd does not exist.") } val table = catalog.getTableMetadata(tableName) if (catalog.isTemporaryTable(tableName)) { throw new AnalysisException( - s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName") + s"Operation not allowed: $cmd on temporary tables: $tableName") } if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( - s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on datasource tables: $tableName") + s"Operation not allowed: $cmd on datasource tables: $tableName") } if (table.tableType != CatalogTableType.EXTERNAL) { throw new AnalysisException( - s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " + - s"tables: $tableName") + s"Operation not allowed: $cmd only works on external tables: $tableName") } - if (DDLUtils.isTablePartitioned(table)) { + if (!DDLUtils.isTablePartitioned(table)) { throw new AnalysisException( - s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned " + - s"tables: $tableName") + s"Operation not allowed: $cmd only works on partitioned tables: $tableName") } if (table.storage.locationUri.isEmpty) { throw new AnalysisException( - s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on tables with " + - s"location provided: $tableName") + s"Operation not allowed: $cmd only works on table with location provided: $tableName") } - recoverPartitions(spark, table) - Seq.empty[Row] - } - - def recoverPartitions(spark: SparkSession, table: CatalogTable): Unit = { val root = new Path(table.storage.locationUri.get) val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) - val partitionSpecsAndLocs = scanPartitions(spark, fs, root, Map(), table.partitionSchema.size) + // Dummy jobconf to get to the pathFilter defined in configuration + // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) + val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + val partitionSpecsAndLocs = scanPartitions( + spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase)) val parts = partitionSpecsAndLocs.map { case (spec, location) => // inherit table storage format (possibly except for location) CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString))) } spark.sessionState.catalog.createPartitions(tableName, parts.toArray[CatalogTablePartition], ignoreIfExists = true) + Seq.empty[Row] } @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) @@ -494,17 +492,18 @@ case class AlterTableRecoverPartitionsCommand( private def scanPartitions( spark: SparkSession, fs: FileSystem, + filter: PathFilter, path: Path, spec: TablePartitionSpec, - numPartitionsLeft: Int): GenSeq[(TablePartitionSpec, Path)] = { - if (numPartitionsLeft == 0) { + partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = { + if (partitionNames.length == 0) { return Seq(spec -> path) } val statuses = fs.listStatus(path) val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt val statusPar: GenSeq[FileStatus] = - if (numPartitionsLeft > 1 && statuses.length > threshold || numPartitionsLeft > 2) { + if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) { val parArray = statuses.par parArray.tasksupport = evalTaskSupport parArray @@ -512,11 +511,25 @@ case class AlterTableRecoverPartitionsCommand( statuses } statusPar.flatMap { st => - val ps = st.getPath.getName.split("=", 2) - if (ps.length != 2) { - throw new AnalysisException(s"Invalid partition path: ${st.getPath}") + val name = st.getPath.getName + if (st.isDirectory && name.contains("=")) { + val ps = name.split("=", 2) + val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase + val value = PartitioningUtils.unescapePathName(ps(1)) + // comparing with case-insensitive, but preserve the case + if (columnName == partitionNames(0)) { + scanPartitions( + spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1)) + } else { + logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it") + Seq() + } + } else { + if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { + logWarning(s"ignore ${new Path(path, name)}") + } + Seq() } - scanPartitions(spark, fs, st.getPath, spec ++ Map(ps(0) -> ps(1)), numPartitionsLeft - 1) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f2af3e90086be..3b1052619b63f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -388,50 +388,6 @@ case class TruncateTableCommand( } } -/** - * A command to repair a table by discovery all the partitions in the directory. - * - * The syntax of this command is: - * {{{ - * MSCK REPAIR TABLE table_name; - * }}} - * - * This command is the same as AlterTableRecoverPartitions - */ -case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand { - override def run(spark: SparkSession): Seq[Row] = { - val catalog = spark.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - if (!catalog.tableExists(tableName)) { - throw new AnalysisException(s"Table $tableName in MSCK REPAIR TABLE does not exist.") - } - if (catalog.isTemporaryTable(tableName)) { - throw new AnalysisException( - s"Operation not allowed: MSCK REPAIR TABLE on temporary tables: $tableName") - } - if (DDLUtils.isDatasourceTable(table)) { - throw new AnalysisException( - s"Operation not allowed: MSCK REPAIR TABLE on datasource tables: $tableName") - } - if (table.tableType != CatalogTableType.EXTERNAL) { - throw new AnalysisException( - s"Operation not allowed: MSCK REPAIR TABLE only works on external tables: $tableName") - } - if (DDLUtils.isTablePartitioned(table)) { - throw new AnalysisException( - s"Operation not allowed: MSCK REPAIR TABLE only works on partitioned tables: $tableName") - } - if (table.storage.locationUri.isEmpty) { - throw new AnalysisException( - s"Operation not allowed: MSCK REPAIR TABLE only works on tables with location provided: " + - s"$tableName") - } - - AlterTableRecoverPartitionsCommand(tableName).recoverPartitions(spark, table) - Seq.empty[Row] - } -} - /** * Command that looks like * {{{ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0eea6247ce6a3..7ad0325334a90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -855,8 +855,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val part2 = Map("a" -> "2", "b" -> "6") val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get) val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + // valid fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) - fs.mkdirs(new Path(new Path(root, "a=2"), "b=6")) + fs.mkdirs(new Path(new Path(root, "A=2"), "B=6")) + // invalid + fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name + fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order + fs.mkdirs(new Path(root, "a=4")) // not enough columns + fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file + fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS + fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary + fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with . + try { sql("ALTER TABLE tab1 RECOVER PARTITIONS") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index bddd7b53cc98b..3f1c380e3b48e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -503,8 +503,9 @@ class HiveDDLCommandSuite extends PlanTest { test("MSCK REPAIR table") { val sql = "MSCK REPAIR TABLE tab1" val parsed = parser.parsePlan(sql) - val expected = RepairTableCommand( - TableIdentifier("tab1", None)) + val expected = AlterTableRecoverPartitionsCommand( + TableIdentifier("tab1", None), + "MSCK REPAIR TABLE") comparePlans(parsed, expected) } From e5906cf408f225b5023ed6d250f850fcd447168a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 8 Aug 2016 13:20:50 -0700 Subject: [PATCH 5/5] adress comments --- .../main/scala/org/apache/spark/sql/execution/command/ddl.scala | 1 + .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 73c63a06828ae..8fa7615b97b18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -515,6 +515,7 @@ case class AlterTableRecoverPartitionsCommand( if (st.isDirectory && name.contains("=")) { val ps = name.split("=", 2) val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase + // TODO: Validate the value val value = PartitioningUtils.unescapePathName(ps(1)) // comparing with case-insensitive, but preserve the case if (columnName == partitionNames(0)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 040cef71e619b..53376c56f1858 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -870,7 +870,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("after table: recover partition (parallel)") { + test("alter table: recover partition (parallel)") { withSQLConf("spark.rdd.parallelListingThreshold" -> "10") { testRecoverPartitions() }