From 898175966712b19bb85dcdcd130d6788f736ed64 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Mon, 9 Dec 2019 16:04:57 +0800 Subject: [PATCH 01/15] Add optimizer rule PruneHiveTablePartitions pruning hive table partitions based on filters on partition columns. Doing so, the total size of pruned partitions may be small enough for broadcast join in JoinSelection strategy. --- .../sql/catalyst/catalog/interface.scala | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 12 +++ .../benchmark/TPCDSQueryBenchmark.scala | 2 +- .../sql/hive/HiveSessionStateBuilder.scala | 17 +++- .../spark/sql/hive/HiveStrategies.scala | 99 ++++++++++++++++++- .../hive/execution/HiveTableScanExec.scala | 8 +- .../spark/sql/hive/StatisticsSuite.scala | 31 ++++++ .../hive/execution/HiveTableScanSuite.scala | 14 ++- 8 files changed, 174 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d51690367bf35..81561c53f83c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -651,7 +651,9 @@ case class HiveTableRelation( tableMeta: CatalogTable, dataCols: Seq[AttributeReference], partitionCols: Seq[AttributeReference], - tableStats: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation { + tableStats: Option[Statistics] = None, + @transient prunedPartitions: Option[Seq[CatalogTablePartition]] = None) + extends LeafNode with MultiInstanceRelation { assert(tableMeta.identifier.database.isDefined) assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType)) assert(tableMeta.dataSchema.sameType(dataCols.toStructType)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 279c79ff14080..416fb73b06681 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1398,6 +1398,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM = + buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum") + .doc("If the number of table partitions exceed this value, falling back to hdfs " + + "for statistics calculation is not allowed. This is used to avoid calculating " + + "the size of a large number of partitions through hdfs, which is very time consuming." + + "Setting this value to 0 or negative will disable falling back to hdfs for " + + "partition statistic calculation.") + .intConf + .createWithDefault(100) + val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() @@ -2569,6 +2579,8 @@ class SQLConf extends Serializable with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def fallBackToHdfsForStatsMaxPartitionNum: Int = getConf(FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM) + def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index be02447db2e55..c93d27f02c686 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -84,7 +84,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark { queryRelations.add(alias.identifier) case LogicalRelation(_, _, Some(catalogTable), _) => queryRelations.add(catalogTable.identifier.table) - case HiveTableRelation(tableMeta, _, _, _) => + case HiveTableRelation(tableMeta, _, _, _, _) => queryRelations.add(tableMeta.identifier.table) case _ => } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index de21a13e6edb8..4fa3b2f96b8d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -21,9 +21,10 @@ import org.apache.spark.annotation.Unstable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener +import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlanner +import org.apache.spark.sql.execution.{SparkOptimizer, SparkPlanner} import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck @@ -93,6 +94,20 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session customCheckRules } + /** + * Logical query plan optimizer that takes into account Hive. + */ + override protected def optimizer: Optimizer = { + new SparkOptimizer(catalogManager, catalog, experimentalMethods) { + override def postHocOptimizationBatches: Seq[Batch] = Seq( + Batch("Prune Hive Table Partitions", Once, PruneHiveTablePartitions(session)) + ) + + override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = + super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules + } + } + /** * Planner that takes into account Hive-specific strategies. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b9c98f4ea15e9..a818681bad1ce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,15 +21,16 @@ import java.io.IOException import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -150,6 +151,100 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with PredicateHelper { + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters(filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } + } + val partitionSet = AttributeSet(relation.partitionCols) + normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) + } + } + + /** + * Prune the hive table using filters on the partitions of the table, + * and also update the statistics of the table. + */ + private def prunedHiveTableWithStats(relation: HiveTableRelation, + partitionFilters: Seq[Expression]): HiveTableRelation = { + val conf = session.sessionState.conf + val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + partitionFilters, + conf.sessionLocalTimeZone) + val sizeInBytes = try { + val partitionsWithSize = prunedPartitions.map { part => + val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { + (part, rawDataSize.get) + } else if (totalSize.isDefined && totalSize.get > 0L) { + (part, totalSize.get) + } else { + (part, 0L) + } + } + val sizeOfPartitions = + if (partitionsWithSize.count(_._2==0) <= conf.fallBackToHdfsForStatsMaxPartitionNum) { + partitionsWithSize.map{ pair => + val (part, size) = (pair._1, pair._2) + if (size == 0) { + CommandUtils.calculateLocationSize( + session.sessionState, relation.tableMeta.identifier, part.storage.locationUri) + } else { + size + } + }.sum + } else { + partitionsWithSize.filter(_._2>0).map(_._2).sum + } + // If size of partitions is zero fall back to the default size. + if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions + } catch { + case e: IOException => + logWarning("Failed to get table size from HDFS.", e) + conf.defaultSizeInBytes + } + val withStats = + if (relation.tableMeta.stats.isDefined) { + relation.tableMeta.copy( + stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = BigInt(sizeInBytes)))) + } else { + relation.tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + } + relation.copy(tableMeta = withStats, prunedPartitions = Some(prunedPartitions)) + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionPruningFilters = extractPartitionPruningFilters(filters, relation) + // SPARK-24085: subquery should be skipped for partition pruning + val hasSubquery = partitionPruningFilters.exists(SubqueryExpression.hasSubquery) + val conf = session.sessionState.conf + if (conf.metastorePartitionPruning && partitionPruningFilters.nonEmpty && !hasSubquery) { + val prunedHiveTable = prunedHiveTableWithStats(relation, partitionPruningFilters) + val filterExpression = filters.reduceLeft(And) + val filter = Filter(filterExpression, prunedHiveTable) + Project(projections, filter) + } else { + op + } + } +} /** * Replaces generic operations with specific variants that are designed to work with Hive. * diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5b00e2ebafa43..9f0f05023bc24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -166,14 +166,14 @@ case class HiveTableScanExec( @transient lazy val rawPartitions = { val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning && - partitionPruningPred.size > 0) { + partitionPruningPred.nonEmpty) { // Retrieve the original attributes based on expression ID so that capitalization matches. val normalizedFilters = partitionPruningPred.map(_.transform { case a: AttributeReference => originalAttributes(a) }) - sparkSession.sessionState.catalog.listPartitionsByFilter( - relation.tableMeta.identifier, - normalizedFilters) + relation.prunedPartitions.getOrElse( + sparkSession.sessionState.catalog + .listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)) } else { sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 488175a22bad7..87930f5a0aee5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1556,4 +1556,35 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } } + + test("Broadcast join can by inferred if partitioned table can be pruned under threshold") { + withTempView("tempTbl", "largeTbl") { + withTable("partTbl") { + spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2") + .createOrReplaceTempView("tempTbl") + spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2") + .createOrReplaceTempView("largeTbl") + sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " + + "PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile") + for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) { + sql( + s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2') + |select col1, col2 from tempTbl + """.stripMargin) + } + val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " + + "and partTbl.part1 = 'a' and partTbl.part2 = 1)" + Seq(true, false).foreach { partitionPruning => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001", + SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> s"$partitionPruning") { + val broadcastJoins = + sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.nonEmpty === partitionPruning) + } + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 67d7ed0841abb..c0d8c6c078a79 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -128,8 +128,12 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH // If the pruning predicate is used, getHiveQlPartitions should only return the // qualified partition; Otherwise, it return all the partitions. val expectedNumPartitions = if (hivePruning == "true") 1 else 2 - checkNumScannedPartitions( - stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'", expectedNumPartitions) + val stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'" + checkNumScannedPartitions(stmt = stmt, expectedNumPartitions) + // prunedPartitions are held in HiveTableRelation + val prunedNumPartitions = if (hivePruning == "true") 1 else 0 + assert( + getHiveTableScanExec(stmt).relation.prunedPartitions.size === prunedNumPartitions) } } @@ -137,8 +141,10 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> hivePruning) { // If the pruning predicate does not exist, getHiveQlPartitions should always // return all the partitions. - checkNumScannedPartitions( - stmt = s"SELECT id, p2 FROM $table WHERE id <= 3", expectedNumParts = 2) + val stmt = s"SELECT id, p2 FROM $table WHERE id <= 3" + checkNumScannedPartitions(stmt = stmt, expectedNumParts = 2) + // no pruning is triggered, no partitions are held in HiveTableRelation + assert(getHiveTableScanExec(stmt).relation.prunedPartitions.isEmpty) } } } From cd4af95da2902bdd39c9844173d63a0a95e390a6 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Sat, 4 Jan 2020 12:55:09 +0800 Subject: [PATCH 02/15] Refine code. --- .../sql/hive/HiveSessionStateBuilder.scala | 3 +- .../spark/sql/hive/HiveStrategies.scala | 99 +------------- .../execution/PruneHiveTablePartitions.scala | 125 ++++++++++++++++++ 3 files changed, 129 insertions(+), 98 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 4fa3b2f96b8d9..b117c582a3e6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.execution.PruneHiveTablePartitions import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} /** @@ -100,7 +101,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session override protected def optimizer: Optimizer = { new SparkOptimizer(catalogManager, catalog, experimentalMethods) { override def postHocOptimizationBatches: Seq[Batch] = Seq( - Batch("Prune Hive Table Partitions", Once, PruneHiveTablePartitions(session)) + Batch("Prune Hive Table Partitions", Once, new PruneHiveTablePartitions(session)) ) override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index a818681bad1ce..b9c98f4ea15e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,16 +21,15 @@ import java.io.IOException import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, ScriptTransformation, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -151,100 +150,6 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } -/** - * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. - */ -case class PruneHiveTablePartitions(session: SparkSession) - extends Rule[LogicalPlan] with PredicateHelper { - /** - * Extract the partition filters from the filters on the table. - */ - private def extractPartitionPruningFilters(filters: Seq[Expression], - relation: HiveTableRelation): Seq[Expression] = { - val normalizedFilters = filters.map { e => - e transform { - case a: AttributeReference => - a.withName(relation.output.find(_.semanticEquals(a)).get.name) - } - } - val partitionSet = AttributeSet(relation.partitionCols) - normalizedFilters.filter { predicate => - !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) - } - } - - /** - * Prune the hive table using filters on the partitions of the table, - * and also update the statistics of the table. - */ - private def prunedHiveTableWithStats(relation: HiveTableRelation, - partitionFilters: Seq[Expression]): HiveTableRelation = { - val conf = session.sessionState.conf - val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( - relation.tableMeta.database, - relation.tableMeta.identifier.table, - partitionFilters, - conf.sessionLocalTimeZone) - val sizeInBytes = try { - val partitionsWithSize = prunedPartitions.map { part => - val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) - val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) - if (rawDataSize.isDefined && rawDataSize.get > 0) { - (part, rawDataSize.get) - } else if (totalSize.isDefined && totalSize.get > 0L) { - (part, totalSize.get) - } else { - (part, 0L) - } - } - val sizeOfPartitions = - if (partitionsWithSize.count(_._2==0) <= conf.fallBackToHdfsForStatsMaxPartitionNum) { - partitionsWithSize.map{ pair => - val (part, size) = (pair._1, pair._2) - if (size == 0) { - CommandUtils.calculateLocationSize( - session.sessionState, relation.tableMeta.identifier, part.storage.locationUri) - } else { - size - } - }.sum - } else { - partitionsWithSize.filter(_._2>0).map(_._2).sum - } - // If size of partitions is zero fall back to the default size. - if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions - } catch { - case e: IOException => - logWarning("Failed to get table size from HDFS.", e) - conf.defaultSizeInBytes - } - val withStats = - if (relation.tableMeta.stats.isDefined) { - relation.tableMeta.copy( - stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = BigInt(sizeInBytes)))) - } else { - relation.tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) - } - relation.copy(tableMeta = withStats, prunedPartitions = Some(prunedPartitions)) - } - - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) - if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => - val partitionPruningFilters = extractPartitionPruningFilters(filters, relation) - // SPARK-24085: subquery should be skipped for partition pruning - val hasSubquery = partitionPruningFilters.exists(SubqueryExpression.hasSubquery) - val conf = session.sessionState.conf - if (conf.metastorePartitionPruning && partitionPruningFilters.nonEmpty && !hasSubquery) { - val prunedHiveTable = prunedHiveTableWithStats(relation, partitionPruningFilters) - val filterExpression = filters.reduceLeft(And) - val filter = Filter(filterExpression, prunedHiveTable) - Project(projections, filter) - } else { - op - } - } -} /** * Replaces generic operations with specific variants that are designed to work with Hive. * diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala new file mode 100644 index 0000000000000..7159719000226 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.hive.common.StatsSetupConst + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils + +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +private[sql] class PruneHiveTablePartitions(session: SparkSession) + extends Rule[LogicalPlan] with PredicateHelper { + /** + * Extract the partition filters from the filters on the table. + */ + private def extractPartitionPruningFilters( + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } + } + val partitionSet = AttributeSet(relation.partitionCols) + normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) + } + } + + /** + * Prune the hive table using filters on the partitions of the table, + * and also update the statistics of the table. + */ + private def prunedHiveTableWithStats( + relation: HiveTableRelation, + partitionFilters: Seq[Expression]): HiveTableRelation = { + val conf = session.sessionState.conf + val prunedPartitions = session.sessionState.catalog.listPartitionsByFilter( + relation.tableMeta.identifier, + partitionFilters) + val sizeInBytes = try { + val partitionsWithSize = prunedPartitions.map { part => + val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { + (part, rawDataSize.get) + } else if (totalSize.isDefined && totalSize.get > 0L) { + (part, totalSize.get) + } else { + (part, 0L) + } + } + val sizeOfPartitions = + if (partitionsWithSize.count(_._2==0) <= conf.fallBackToHdfsForStatsMaxPartitionNum) { + partitionsWithSize.map{ pair => + val (part, size) = (pair._1, pair._2) + if (size == 0) { + CommandUtils.calculateLocationSize( + session.sessionState, relation.tableMeta.identifier, part.storage.locationUri) + } else { + size + } + }.sum + } else { + partitionsWithSize.filter(_._2>0).map(_._2).sum + } + // If size of partitions is zero fall back to the default size. + if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions + } catch { + case e: IOException => + logWarning("Failed to get table size from HDFS.", e) + conf.defaultSizeInBytes + } + val newTableMeta = + if (relation.tableMeta.stats.isDefined) { + relation.tableMeta.copy( + stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = BigInt(sizeInBytes)))) + } else { + relation.tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + } + relation.copy(tableMeta = newTableMeta, prunedPartitions = Some(prunedPartitions)) + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) + if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => + val partitionPruningFilters = extractPartitionPruningFilters(filters, relation) + // SPARK-24085: subquery should be skipped for partition pruning + val hasSubquery = partitionPruningFilters.exists(SubqueryExpression.hasSubquery) + val conf = session.sessionState.conf + if (conf.metastorePartitionPruning && partitionPruningFilters.nonEmpty && !hasSubquery) { + val prunedHiveTable = prunedHiveTableWithStats(relation, partitionPruningFilters) + val filterExpression = filters.reduceLeft(And) + val filter = Filter(filterExpression, prunedHiveTable) + Project(projections, filter) + } else { + op + } + } +} From e4698c5f8d127bee859d2e57445363c04560449c Mon Sep 17 00:00:00 2001 From: fuwhu Date: Mon, 6 Jan 2020 13:35:10 +0800 Subject: [PATCH 03/15] Remove conf item FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM, leaving it to new PR as it is a common problem. --- .../apache/spark/sql/internal/SQLConf.scala | 12 ------------ .../execution/PruneHiveTablePartitions.scala | 19 +++++++------------ 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 416fb73b06681..279c79ff14080 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1398,16 +1398,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM = - buildConf("spark.sql.statistics.fallBackToHdfs.maxPartitionNum") - .doc("If the number of table partitions exceed this value, falling back to hdfs " + - "for statistics calculation is not allowed. This is used to avoid calculating " + - "the size of a large number of partitions through hdfs, which is very time consuming." + - "Setting this value to 0 or negative will disable falling back to hdfs for " + - "partition statistic calculation.") - .intConf - .createWithDefault(100) - val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() @@ -2579,8 +2569,6 @@ class SQLConf extends Serializable with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) - def fallBackToHdfsForStatsMaxPartitionNum: Int = getConf(FALL_BACK_TO_HDFS_FOR_STATS_MAX_PART_NUM) - def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 7159719000226..f27ab2727d937 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -75,20 +75,15 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) (part, 0L) } } - val sizeOfPartitions = - if (partitionsWithSize.count(_._2==0) <= conf.fallBackToHdfsForStatsMaxPartitionNum) { - partitionsWithSize.map{ pair => - val (part, size) = (pair._1, pair._2) - if (size == 0) { - CommandUtils.calculateLocationSize( - session.sessionState, relation.tableMeta.identifier, part.storage.locationUri) - } else { - size - } - }.sum + val sizeOfPartitions = partitionsWithSize.map{ pair => + val (partition, size) = (pair._1, pair._2) + if (size == 0) { + CommandUtils.calculateLocationSize( + session.sessionState, relation.tableMeta.identifier, partition.storage.locationUri) } else { - partitionsWithSize.filter(_._2>0).map(_._2).sum + size } + }.sum // If size of partitions is zero fall back to the default size. if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions } catch { From 89872338ecc73e7a3a890bf0cb27a36a7b5d80bd Mon Sep 17 00:00:00 2001 From: fuwhu Date: Mon, 6 Jan 2020 14:00:32 +0800 Subject: [PATCH 04/15] Refine code. --- .../execution/PruneHiveTablePartitions.scala | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index f27ab2727d937..b8040edfcb5be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -64,24 +64,16 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) relation.tableMeta.identifier, partitionFilters) val sizeInBytes = try { - val partitionsWithSize = prunedPartitions.map { part => - val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) - val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) if (rawDataSize.isDefined && rawDataSize.get > 0) { - (part, rawDataSize.get) + rawDataSize.get } else if (totalSize.isDefined && totalSize.get > 0L) { - (part, totalSize.get) + totalSize.get } else { - (part, 0L) - } - } - val sizeOfPartitions = partitionsWithSize.map{ pair => - val (partition, size) = (pair._1, pair._2) - if (size == 0) { CommandUtils.calculateLocationSize( session.sessionState, relation.tableMeta.identifier, partition.storage.locationUri) - } else { - size } }.sum // If size of partitions is zero fall back to the default size. From 1eafe1e21e08aa6db614111cd3505fee9caf1342 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Thu, 9 Jan 2020 13:26:46 +0800 Subject: [PATCH 05/15] Add PruneHiveTablePartitionsSute. --- .../spark/sql/hive/StatisticsSuite.scala | 31 ---------- .../PruneHiveTablePartitionsSuite.scala | 61 +++++++++++++++++++ 2 files changed, 61 insertions(+), 31 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 87930f5a0aee5..488175a22bad7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1556,35 +1556,4 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } } - - test("Broadcast join can by inferred if partitioned table can be pruned under threshold") { - withTempView("tempTbl", "largeTbl") { - withTable("partTbl") { - spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2") - .createOrReplaceTempView("tempTbl") - spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2") - .createOrReplaceTempView("largeTbl") - sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " + - "PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile") - for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) { - sql( - s""" - |INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2') - |select col1, col2 from tempTbl - """.stripMargin) - } - val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " + - "and partTbl.part1 = 'a' and partTbl.part2 = 1)" - Seq(true, false).foreach { partitionPruning => - withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001", - SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> s"$partitionPruning") { - val broadcastJoins = - sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } - assert(broadcastJoins.nonEmpty === partitionPruning) - } - } - } - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala new file mode 100644 index 0000000000000..63a38f8e8cc45 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("PruneHiveTablePartitions", Once, + EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil + } + + test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { + withTable("test", "temp") { + withTempDir { dir => + sql( + s""" + |CREATE EXTERNAL TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile + |LOCATION '${dir.toURI}'""".stripMargin) + + spark.range(0, 1000, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") + + for (part <- Seq(1, 2, 3, 4)) { + sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + val singlePartitionSizeInBytes = 3890 + val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed + val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed + assert(Optimize.execute(analyzed1).stats.sizeInBytes === singlePartitionSizeInBytes*4*12/16) + assert(Optimize.execute(analyzed2).stats.sizeInBytes === singlePartitionSizeInBytes*12/16) + } + } + } +} From 5dd01fd4058771f5ea89f703dea97d0ffce55b23 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Thu, 9 Jan 2020 15:07:01 +0800 Subject: [PATCH 06/15] Refine PruneHiveTablePartitions : prune partitions through metasotre firstly if HIVE_METASTORE_PARTITION_PRUNING enabled, and then prune again using partition filters. --- .../execution/PruneHiveTablePartitions.scala | 88 ++++++++++++------- .../hive/execution/HiveTableScanSuite.scala | 14 +-- 2 files changed, 62 insertions(+), 40 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index b8040edfcb5be..0bb32688d679c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -19,21 +19,30 @@ package org.apache.spark.sql.hive.execution import java.io.IOException +import scala.collection.JavaConverters._ + import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.types.BooleanType /** * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. */ private[sql] class PruneHiveTablePartitions(session: SparkSession) - extends Rule[LogicalPlan] with PredicateHelper { + extends Rule[LogicalPlan] with CastSupport { + + override val conf = session.sessionState.conf + /** * Extract the partition filters from the filters on the table. */ @@ -53,18 +62,46 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) } /** - * Prune the hive table using filters on the partitions of the table, - * and also update the statistics of the table. + * Prune the hive table using filters on the partitions of the table. */ - private def prunedHiveTableWithStats( - relation: HiveTableRelation, - partitionFilters: Seq[Expression]): HiveTableRelation = { - val conf = session.sessionState.conf - val prunedPartitions = session.sessionState.catalog.listPartitionsByFilter( - relation.tableMeta.identifier, - partitionFilters) + private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) + : Seq[CatalogTablePartition] = { + val partitions = + if (conf.metastorePartitionPruning) { + session.sessionState.catalog.listPartitionsByFilter( + relation.tableMeta.identifier, partitionFilters) + } else { + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) + } + val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => + require(filter.dataType == BooleanType, + s"Data type of predicate $filter must be ${BooleanType.catalogString} rather than " + + s"${filter.dataType.catalogString}.") + BindReferences.bindReference(filter, relation.partitionCols) + } + if (shouldKeep.nonEmpty) { + partitions.filter{ partition => + val hivePartition = + HiveClientImpl.toHivePartition(partition, HiveClientImpl.toHiveTable(relation.tableMeta)) + val dataTypes = relation.partitionCols.map(_.dataType) + val castedValues = hivePartition.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => cast(Literal(value), dataType).eval(null) } + val row = InternalRow.fromSeq(castedValues) + shouldKeep.get.eval(row).asInstanceOf[Boolean] + } + } else { + partitions + } + } + + /** + * Update the statistics of the table. + */ + private def updateTableMeta( + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { val sizeInBytes = try { - val sizeOfPartitions = prunedPartitions.map { partition => + prunedPartitions.map { partition => val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) if (rawDataSize.isDefined && rawDataSize.get > 0) { @@ -73,24 +110,15 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) totalSize.get } else { CommandUtils.calculateLocationSize( - session.sessionState, relation.tableMeta.identifier, partition.storage.locationUri) + session.sessionState, tableMeta.identifier, partition.storage.locationUri) } }.sum - // If size of partitions is zero fall back to the default size. - if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions } catch { case e: IOException => logWarning("Failed to get table size from HDFS.", e) conf.defaultSizeInBytes } - val newTableMeta = - if (relation.tableMeta.stats.isDefined) { - relation.tableMeta.copy( - stats = Some(relation.tableMeta.stats.get.copy(sizeInBytes = BigInt(sizeInBytes)))) - } else { - relation.tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) - } - relation.copy(tableMeta = newTableMeta, prunedPartitions = Some(prunedPartitions)) + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) } override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { @@ -99,12 +127,12 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) val partitionPruningFilters = extractPartitionPruningFilters(filters, relation) // SPARK-24085: subquery should be skipped for partition pruning val hasSubquery = partitionPruningFilters.exists(SubqueryExpression.hasSubquery) - val conf = session.sessionState.conf - if (conf.metastorePartitionPruning && partitionPruningFilters.nonEmpty && !hasSubquery) { - val prunedHiveTable = prunedHiveTableWithStats(relation, partitionPruningFilters) - val filterExpression = filters.reduceLeft(And) - val filter = Filter(filterExpression, prunedHiveTable) - Project(projections, filter) + if (partitionPruningFilters.nonEmpty && !hasSubquery) { + val newPartitions = prunePartitions(relation, partitionPruningFilters) + val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) + val newRelation = relation.copy( + tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) + Project(projections, Filter(filters.reduceLeft(And), newRelation)) } else { op } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index c0d8c6c078a79..67d7ed0841abb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -128,12 +128,8 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH // If the pruning predicate is used, getHiveQlPartitions should only return the // qualified partition; Otherwise, it return all the partitions. val expectedNumPartitions = if (hivePruning == "true") 1 else 2 - val stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'" - checkNumScannedPartitions(stmt = stmt, expectedNumPartitions) - // prunedPartitions are held in HiveTableRelation - val prunedNumPartitions = if (hivePruning == "true") 1 else 0 - assert( - getHiveTableScanExec(stmt).relation.prunedPartitions.size === prunedNumPartitions) + checkNumScannedPartitions( + stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'", expectedNumPartitions) } } @@ -141,10 +137,8 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> hivePruning) { // If the pruning predicate does not exist, getHiveQlPartitions should always // return all the partitions. - val stmt = s"SELECT id, p2 FROM $table WHERE id <= 3" - checkNumScannedPartitions(stmt = stmt, expectedNumParts = 2) - // no pruning is triggered, no partitions are held in HiveTableRelation - assert(getHiveTableScanExec(stmt).relation.prunedPartitions.isEmpty) + checkNumScannedPartitions( + stmt = s"SELECT id, p2 FROM $table WHERE id <= 3", expectedNumParts = 2) } } } From 79e5cf95e7fc0ce387127e09aafd0632fa2b5c19 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 14 Jan 2020 10:36:04 +0800 Subject: [PATCH 07/15] Fix indentation. --- .../sql/hive/execution/PruneHiveTablePartitions.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 0bb32688d679c..ece365a23becc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -47,8 +47,8 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) * Extract the partition filters from the filters on the table. */ private def extractPartitionPruningFilters( - filters: Seq[Expression], - relation: HiveTableRelation): Seq[Expression] = { + filters: Seq[Expression], + relation: HiveTableRelation): Seq[Expression] = { val normalizedFilters = filters.map { e => e transform { case a: AttributeReference => @@ -64,8 +64,9 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) /** * Prune the hive table using filters on the partitions of the table. */ - private def prunePartitions(relation: HiveTableRelation, partitionFilters: Seq[Expression]) - : Seq[CatalogTablePartition] = { + private def prunePartitions( + relation: HiveTableRelation, + partitionFilters: Seq[Expression]): Seq[CatalogTablePartition] = { val partitions = if (conf.metastorePartitionPruning) { session.sessionState.catalog.listPartitionsByFilter( From 7fa3718563fbd50f21308e4bd9fbd97fe740dca4 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 14 Jan 2020 10:46:50 +0800 Subject: [PATCH 08/15] Drop sizeInBytes of partition when it can't be got from metadata. --- .../execution/PruneHiveTablePartitions.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index ece365a23becc..53f147933f1b8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Attri import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types.BooleanType @@ -46,7 +45,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) /** * Extract the partition filters from the filters on the table. */ - private def extractPartitionPruningFilters( + private def getPartitionKeyFilters( filters: Seq[Expression], relation: HiveTableRelation): Seq[Expression] = { val normalizedFilters = filters.map { e => @@ -99,9 +98,9 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) * Update the statistics of the table. */ private def updateTableMeta( - tableMeta: CatalogTable, - prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { - val sizeInBytes = try { + tableMeta: CatalogTable, + prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { + val sizeOfPartitions = try { prunedPartitions.map { partition => val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) @@ -110,8 +109,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) } else if (totalSize.isDefined && totalSize.get > 0L) { totalSize.get } else { - CommandUtils.calculateLocationSize( - session.sessionState, tableMeta.identifier, partition.storage.locationUri) + 0L } }.sum } catch { @@ -119,13 +117,15 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) logWarning("Failed to get table size from HDFS.", e) conf.defaultSizeInBytes } + // If size of partitions is zero fall back to the default size. + val sizeInBytes = if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) } override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => - val partitionPruningFilters = extractPartitionPruningFilters(filters, relation) + val partitionPruningFilters = getPartitionKeyFilters(filters, relation) // SPARK-24085: subquery should be skipped for partition pruning val hasSubquery = partitionPruningFilters.exists(SubqueryExpression.hasSubquery) if (partitionPruningFilters.nonEmpty && !hasSubquery) { From 0b21e77420c7f244d0b7ab0e997d66a4b73e712b Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 14 Jan 2020 17:38:58 +0800 Subject: [PATCH 09/15] Don't need to prune again in PruneHiveTablePartitions.prunePartitions any more since HiveExternalCatalog.listPartitionsByFilter can already return exactly what we want. --- .../execution/PruneHiveTablePartitions.scala | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 53f147933f1b8..6fb2de3bc48c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -66,32 +66,12 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) private def prunePartitions( relation: HiveTableRelation, partitionFilters: Seq[Expression]): Seq[CatalogTablePartition] = { - val partitions = if (conf.metastorePartitionPruning) { session.sessionState.catalog.listPartitionsByFilter( relation.tableMeta.identifier, partitionFilters) } else { session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) } - val shouldKeep = partitionFilters.reduceLeftOption(And).map { filter => - require(filter.dataType == BooleanType, - s"Data type of predicate $filter must be ${BooleanType.catalogString} rather than " + - s"${filter.dataType.catalogString}.") - BindReferences.bindReference(filter, relation.partitionCols) - } - if (shouldKeep.nonEmpty) { - partitions.filter{ partition => - val hivePartition = - HiveClientImpl.toHivePartition(partition, HiveClientImpl.toHiveTable(relation.tableMeta)) - val dataTypes = relation.partitionCols.map(_.dataType) - val castedValues = hivePartition.getValues.asScala.zip(dataTypes) - .map { case (value, dataType) => cast(Literal(value), dataType).eval(null) } - val row = InternalRow.fromSeq(castedValues) - shouldKeep.get.eval(row).asInstanceOf[Boolean] - } - } else { - partitions - } } /** From a9ce634486cea11bdc51fe59734482c3aae5b60a Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 15 Jan 2020 09:22:39 +0800 Subject: [PATCH 10/15] Make PruneHiveTablePartitions.getPartitionKeyFilters follows PruneFileSourcePartitions.getPartitionKeyFilters. --- .../execution/PruneHiveTablePartitions.scala | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 6fb2de3bc48c8..d14a39336108f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -20,17 +20,16 @@ package org.apache.spark.sql.hive.execution import java.io.IOException import scala.collection.JavaConverters._ - import org.apache.hadoop.hive.common.StatsSetupConst - import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, ExpressionSet, Literal, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types.BooleanType @@ -47,17 +46,13 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) */ private def getPartitionKeyFilters( filters: Seq[Expression], - relation: HiveTableRelation): Seq[Expression] = { - val normalizedFilters = filters.map { e => - e transform { - case a: AttributeReference => - a.withName(relation.output.find(_.semanticEquals(a)).get.name) - } - } - val partitionSet = AttributeSet(relation.partitionCols) - normalizedFilters.filter { predicate => - !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) - } + relation: HiveTableRelation): ExpressionSet = { + val normalizedFilters = DataSourceStrategy.normalizeExprs( + filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) + val partitionColumnSet = AttributeSet(relation.partitionCols) + ExpressionSet(normalizedFilters.filter { f => + !f.references.isEmpty && f.references.subsetOf(partitionColumnSet) + }) } /** @@ -65,10 +60,10 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) */ private def prunePartitions( relation: HiveTableRelation, - partitionFilters: Seq[Expression]): Seq[CatalogTablePartition] = { + partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { if (conf.metastorePartitionPruning) { session.sessionState.catalog.listPartitionsByFilter( - relation.tableMeta.identifier, partitionFilters) + relation.tableMeta.identifier, partitionFilters.toSeq) } else { session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) } @@ -105,11 +100,9 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => - val partitionPruningFilters = getPartitionKeyFilters(filters, relation) - // SPARK-24085: subquery should be skipped for partition pruning - val hasSubquery = partitionPruningFilters.exists(SubqueryExpression.hasSubquery) - if (partitionPruningFilters.nonEmpty && !hasSubquery) { - val newPartitions = prunePartitions(relation, partitionPruningFilters) + val partitionKeyFilters = getPartitionKeyFilters(filters, relation) + if (partitionKeyFilters.nonEmpty) { + val newPartitions = prunePartitions(relation, partitionKeyFilters) val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) val newRelation = relation.copy( tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) From fea6fdcf22c323e1f55a39b23e41d30c1c9d7214 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 15 Jan 2020 10:03:43 +0800 Subject: [PATCH 11/15] empty commit --- .../sql/hive/execution/PruneHiveTablePartitions.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index d14a39336108f..3490a05ea612d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -19,19 +19,16 @@ package org.apache.spark.sql.hive.execution import java.io.IOException -import scala.collection.JavaConverters._ import org.apache.hadoop.hive.common.StatsSetupConst + import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, BindReferences, Expression, ExpressionSet, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.hive.client.HiveClientImpl -import org.apache.spark.sql.types.BooleanType /** * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. From 14ae8785cc3c7f177121847005c6386c6f1f7433 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 15 Jan 2020 23:02:35 +0800 Subject: [PATCH 12/15] leave statistic unchanged if the sizeInBytes of some partition is not available in metadata. --- .../execution/PruneHiveTablePartitions.scala | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 3490a05ea612d..7c8017391f8de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -17,18 +17,17 @@ package org.apache.spark.sql.hive.execution -import java.io.IOException - import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.CastSupport -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, HiveTableRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.internal.SQLConf /** * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. @@ -36,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy private[sql] class PruneHiveTablePartitions(session: SparkSession) extends Rule[LogicalPlan] with CastSupport { - override val conf = session.sessionState.conf + override val conf: SQLConf = session.sessionState.conf /** * Extract the partition filters from the filters on the table. @@ -62,7 +61,9 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) session.sessionState.catalog.listPartitionsByFilter( relation.tableMeta.identifier, partitionFilters.toSeq) } else { - session.sessionState.catalog.listPartitions(relation.tableMeta.identifier) + ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, + session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), + partitionFilters.toSeq, conf.sessionLocalTimeZone) } } @@ -72,26 +73,23 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) private def updateTableMeta( tableMeta: CatalogTable, prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { - val sizeOfPartitions = try { - prunedPartitions.map { partition => - val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) - val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) - if (rawDataSize.isDefined && rawDataSize.get > 0) { - rawDataSize.get - } else if (totalSize.isDefined && totalSize.get > 0L) { - totalSize.get - } else { - 0L - } - }.sum - } catch { - case e: IOException => - logWarning("Failed to get table size from HDFS.", e) - conf.defaultSizeInBytes + val sizeOfPartitions = prunedPartitions.map { partition => + val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { + rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { + totalSize.get + } else { + 0L + } + } + if (sizeOfPartitions.forall(s => s>0)) { + val sizeInBytes = sizeOfPartitions.sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + } else { + tableMeta } - // If size of partitions is zero fall back to the default size. - val sizeInBytes = if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions - tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) } override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { From 6a4a4b245d965274ec6dfa358287fcefc934d742 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Thu, 16 Jan 2020 17:31:01 +0800 Subject: [PATCH 13/15] refine code. --- .../sql/execution/datasources/PruneFileSourcePartitions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 7fd154ccac445..0bd8f1050e684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan} import org.apache.spark.sql.types.StructType private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { From ce204396ed16aecc85f52f4251becb42e1e0c67c Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 21 Jan 2020 10:57:46 +0800 Subject: [PATCH 14/15] Refine code. --- .../execution/PruneHiveTablePartitions.scala | 1 + .../PruneHiveTablePartitionsSuite.scala | 35 ++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index 7c8017391f8de..c596003c88349 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -101,6 +101,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions) val newRelation = relation.copy( tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) + // Keep partition filters so that they are visible in physical planning Project(projections, Filter(filters.reduceLeft(And), newRelation)) } else { op diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala index 63a38f8e8cc45..1c0a0115b7dbe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -34,28 +34,23 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { withTable("test", "temp") { - withTempDir { dir => - sql( - s""" - |CREATE EXTERNAL TABLE test(i int) - |PARTITIONED BY (p int) - |STORED AS textfile - |LOCATION '${dir.toURI}'""".stripMargin) + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + spark.range(0, 1000, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") - spark.range(0, 1000, 1).selectExpr("id as col") - .createOrReplaceTempView("temp") - - for (part <- Seq(1, 2, 3, 4)) { - sql(s""" - |INSERT OVERWRITE TABLE test PARTITION (p='$part') - |select col from temp""".stripMargin) - } - val singlePartitionSizeInBytes = 3890 - val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed - val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed - assert(Optimize.execute(analyzed1).stats.sizeInBytes === singlePartitionSizeInBytes*4*12/16) - assert(Optimize.execute(analyzed2).stats.sizeInBytes === singlePartitionSizeInBytes*12/16) + for (part <- Seq(1, 2, 3, 4)) { + sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) } + val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed + val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed + assert(Optimize.execute(analyzed1).stats.sizeInBytes/4 === + Optimize.execute(analyzed2).stats.sizeInBytes) } } } From b1798d52147b081c7073f3c096eb886a867b921d Mon Sep 17 00:00:00 2001 From: fuwhu Date: Tue, 21 Jan 2020 16:06:35 +0800 Subject: [PATCH 15/15] Fix scala code style. --- .../execution/PruneHiveTablePartitions.scala | 2 +- .../PruneHiveTablePartitionsSuite.scala | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index c596003c88349..a0349f627d107 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -84,7 +84,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) 0L } } - if (sizeOfPartitions.forall(s => s>0)) { + if (sizeOfPartitions.forall(_ > 0)) { val sizeInBytes = sizeOfPartitions.sum tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala index 1c0a0115b7dbe..e41709841a736 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -36,20 +36,21 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes withTable("test", "temp") { sql( s""" - |CREATE TABLE test(i int) - |PARTITIONED BY (p int) - |STORED AS textfile""".stripMargin) + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) spark.range(0, 1000, 1).selectExpr("id as col") .createOrReplaceTempView("temp") for (part <- Seq(1, 2, 3, 4)) { - sql(s""" - |INSERT OVERWRITE TABLE test PARTITION (p='$part') - |select col from temp""".stripMargin) + sql( + s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) } - val analyzed1 = sql("select i from test where p>0").queryExecution.analyzed - val analyzed2 = sql("select i from test where p=1").queryExecution.analyzed - assert(Optimize.execute(analyzed1).stats.sizeInBytes/4 === + val analyzed1 = sql("select i from test where p > 0").queryExecution.analyzed + val analyzed2 = sql("select i from test where p = 1").queryExecution.analyzed + assert(Optimize.execute(analyzed1).stats.sizeInBytes / 4 === Optimize.execute(analyzed2).stats.sizeInBytes) } }