From a0060abc5b413de3eaef3b91aecfc1ed29a2f008 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 1 Nov 2016 14:03:35 +0000 Subject: [PATCH 1/2] Address dynamic partition. --- .../datasources/PartitioningUtils.scala | 2 +- .../hive/execution/InsertIntoHiveTable.scala | 64 ++++++++++++++++--- .../sql/hive/execution/SQLQuerySuite.scala | 36 +++++++++++ 3 files changed, 92 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index b51b41869bf06..73ca1376bbe6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -88,7 +88,7 @@ object PartitioningUtils { * path = "hdfs://:/path/to/partition/a=2/b=world/c=6.28"))) * }}} */ - private[datasources] def parsePartitions( + private[spark] def parsePartitions( paths: Seq[Path], defaultPartitionName: String, typeInference: Boolean, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 2843100fb3b36..52aef0a9eed70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.common.{FileUtils, HiveStatsUtils} import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand} +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException @@ -239,12 +240,47 @@ case class InsertIntoHiveTable( val holdDDLTime = false if (partition.nonEmpty) { if (numDynamicPartitions > 0) { + // SPARK-18107: Insert overwrite runs much slower than hive-client. + // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive + // version and we may not want to catch up new Hive version every time. We delete the + // Hive partition first and then load data file into the Hive partition. + var doHiveOverwrite = overwrite + if (overwrite) { + val fs = outputPath.getFileSystem(hadoopConf) + val partitionPaths = + HiveStatsUtils.getFileStatusRecurse(outputPath, numDynamicPartitions, fs) + .map(_.getPath()) + val partitionSpecInOutputPath = + PartitioningUtils.parsePartitions( + partitionPaths, + PartitioningUtils.DEFAULT_PARTITION_NAME, + true, + Set(outputPath)) + + val schema = partitionSpecInOutputPath.partitionColumns + val columnNames = schema.fieldNames + partitionSpecInOutputPath.partitions.flatMap { partition => + val spec = columnNames.zip(partition.values.toSeq(schema).map(_.toString)).toMap + externalCatalog.getPartitionOption( + table.catalogTable.database, + table.catalogTable.identifier.table, + partitionSpec ++ spec) + }.foreach { part => + part.storage.locationUri.map { uri => + if (removePartitionPath(new Path(uri), hadoopConf)) { + // Don't let Hive do overwrite operation since it is slower. + doHiveOverwrite = false + } + } + } + } + externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, table = table.catalogTable.identifier.table, outputPath.toString, partitionSpec, - overwrite, + doHiveOverwrite, numDynamicPartitions, holdDDLTime = holdDDLTime) } else { @@ -267,13 +303,7 @@ case class InsertIntoHiveTable( // Hive partition first and then load data file into the Hive partition. if (oldPart.nonEmpty && overwrite) { oldPart.get.storage.locationUri.map { uri => - val partitionPath = new Path(uri) - val fs = partitionPath.getFileSystem(hadoopConf) - if (fs.exists(partitionPath)) { - if (!fs.delete(partitionPath, true)) { - throw new RuntimeException( - "Cannot remove partition directory '" + partitionPath.toString) - } + if (removePartitionPath(new Path(uri), hadoopConf)) { // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false } @@ -313,6 +343,22 @@ case class InsertIntoHiveTable( Seq.empty[InternalRow] } + // Deletes a partition path. Returns true if the path exists and is successfully deleted. + // Returns false if the path doesn't exist. Throws RuntimeException if error happens when + // deleting the path. + private def removePartitionPath(partitionPath: Path, hadoopConf: Configuration): Boolean = { + val fs = partitionPath.getFileSystem(hadoopConf) + if (fs.exists(partitionPath)) { + if (!fs.delete(partitionPath, true)) { + throw new RuntimeException( + "Cannot remove partition directory '" + partitionPath.toString) + } + true + } else { + false + } + } + override def outputPartitioning: Partitioning = child.outputPartitioning override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 8b916932ff543..a83785f4934e0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2006,6 +2006,42 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("Insert overwrite with partition: dynamic partition") { + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + withTable("tableWithPartition") { + sql( + """ + |CREATE TABLE tableWithPartition (key int, value STRING) + |PARTITIONED BY (part1 STRING, part2 INT, part3 LONG) + """.stripMargin) + sql( + """ + |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part1 = '1', part2 = 2, part3) + |SELECT key, value, 3 AS part3 FROM default.src + """.stripMargin) + checkAnswer( + sql("SELECT part1, part2, part3, key, value FROM tableWithPartition"), + sql("SELECT '1' AS part1, 2 AS part2, 3 AS part3, key, value FROM default.src") + ) + + sql( + """ + |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part1 = '1', part2 = 2, part3) + |SELECT key, value, part3 FROM + |VALUES (1, "one", 3), (2, "two", 3), (3, null, 3) AS data(key, value, part3) + """.stripMargin) + checkAnswer( + sql("SELECT part1, part2, part3, key, value FROM tableWithPartition"), + sql( + """ + |SELECT '1' AS part1, 2 AS part2, 3 AS part3, key, value FROM VALUES + |(1, "one"), (2, "two"), (3, null) AS data(key, value) + """.stripMargin) + ) + } + } + } + def testCommandAvailable(command: String): Boolean = { val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue()) attempt.isSuccess && attempt.get == 0 From eae8f1ad1d8240c236a73066610747f3e7ef3669 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 2 Nov 2016 02:26:23 +0000 Subject: [PATCH 2/2] Add comments. --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 52aef0a9eed70..cf5bf2b5e42d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -247,6 +247,9 @@ case class InsertIntoHiveTable( var doHiveOverwrite = overwrite if (overwrite) { val fs = outputPath.getFileSystem(hadoopConf) + // Extracts the partition paths from output path. + // E.g., if the dynamic partition columns are "a" and "b", we would get the paths like + // "/output/a=1/b=2", "/output/a=2/b=3". val partitionPaths = HiveStatsUtils.getFileStatusRecurse(outputPath, numDynamicPartitions, fs) .map(_.getPath()) @@ -259,8 +262,11 @@ case class InsertIntoHiveTable( val schema = partitionSpecInOutputPath.partitionColumns val columnNames = schema.fieldNames + partitionSpecInOutputPath.partitions.flatMap { partition => + // Construct partition spec from parsed dynamic partition column names and values. val spec = columnNames.zip(partition.values.toSeq(schema).map(_.toString)).toMap + // Using static partition spec and dynamic partition spec to get partition metadata. externalCatalog.getPartitionOption( table.catalogTable.database, table.catalogTable.identifier.table,