From dbcb4504cc7945c9c43b2e2d979eeefd603bf206 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 31 Jul 2020 17:16:08 +0530 Subject: [PATCH 1/3] Ignore unused DPP True Filter in Canonicalization --- .../sql/execution/DataSourceScanExec.scala | 6 ++++- .../PlanDynamicPruningFilters.scala | 2 +- .../sql/DynamicPartitionPruningSuite.scala | 26 +++++++++++++++++-- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 78808ff21394c..3fa7b189951d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -608,12 +608,16 @@ case class FileSourceScanExec( new FileScanRDD(fsRelation.sparkSession, readFile, partitions) } + private def filterUnnecessaryPredicates(predicates: Seq[Expression]): Seq[Expression] = { + predicates.filterNot(_ == Literal.TrueLiteral) + } + override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation, output.map(QueryPlan.normalizeExpressions(_, output)), requiredSchema, - QueryPlan.normalizePredicates(partitionFilters, output), + QueryPlan.normalizePredicates(filterUnnecessaryPredicates(partitionFilters), output), optionalBucketSet, optionalNumCoalescedBuckets, QueryPlan.normalizePredicates(dataFilters, output), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index 098576d72f540..652dc46b0262c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -78,7 +78,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) } else if (onlyInBroadcast) { // it is not worthwhile to execute the query, so we fall-back to a true literal - DynamicPruningExpression(Literal.TrueLiteral) + Literal.TrueLiteral } else { // we need to apply an aggregate on the buildPlan in order to be column pruned val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 0b754e9e3ec0b..7cd4750f083b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expr import org.apache.spark.sql.catalyst.plans.ExistenceJoin import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} -import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1240,6 +1240,28 @@ abstract class DynamicPartitionPruningSuiteBase } } + test("Unused Dynamic Pruning filter shouldn't affect canonicalization and exchange reuse") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = sql( + """ WITH view1 as ( + | SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 + | ) + | + | SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id + """.stripMargin) + + val executedPlan = df.queryExecution.executedPlan + checkPartitionPruningPredicate(df, false, false) + val reuseExchangeNodes = executedPlan.collect { case se: ReusedExchangeExec => se } + assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " + + s"nodes. Found ${reuseExchangeNodes.size}") + + checkAnswer(df, Row(15, 15) :: Nil) + } + } + } + test("Plan broadcast pruning only when the broadcast can be reused") { Given("dynamic pruning filter on the build side") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { From 6cb070330d4bf3a70d9e49e72bcda136ed0bebb1 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 31 Jul 2020 17:39:15 +0530 Subject: [PATCH 2/3] remove unnecessary imports --- .../apache/spark/sql/DynamicPartitionPruningSuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 7cd4750f083b0..219d54c12cccc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expr import org.apache.spark.sql.catalyst.plans.ExistenceJoin import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} +import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1251,9 +1251,10 @@ abstract class DynamicPartitionPruningSuiteBase | SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id """.stripMargin) - val executedPlan = df.queryExecution.executedPlan checkPartitionPruningPredicate(df, false, false) - val reuseExchangeNodes = executedPlan.collect { case se: ReusedExchangeExec => se } + val reuseExchangeNodes = df.queryExecution.executedPlan.collect { + case se: ReusedExchangeExec => se + } assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " + s"nodes. Found ${reuseExchangeNodes.size}") From b1f8ba8c932bb3b45c07edccf8a6bf21134aa145 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Sat, 1 Aug 2020 18:39:48 +0530 Subject: [PATCH 3/3] address review comments --- .../spark/sql/execution/DataSourceScanExec.scala | 10 +++++++--- .../dynamicpruning/PlanDynamicPruningFilters.scala | 2 +- .../spark/sql/DynamicPartitionPruningSuite.scala | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 3fa7b189951d7..bef9f4b46c628 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -608,8 +608,11 @@ case class FileSourceScanExec( new FileScanRDD(fsRelation.sparkSession, readFile, partitions) } - private def filterUnnecessaryPredicates(predicates: Seq[Expression]): Seq[Expression] = { - predicates.filterNot(_ == Literal.TrueLiteral) + // Filters unused DynamicPruningExpression expressions - one which has been replaced + // with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning + private def filterUnusedDynamicPruningExpressions( + predicates: Seq[Expression]): Seq[Expression] = { + predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)) } override def doCanonicalize(): FileSourceScanExec = { @@ -617,7 +620,8 @@ case class FileSourceScanExec( relation, output.map(QueryPlan.normalizeExpressions(_, output)), requiredSchema, - QueryPlan.normalizePredicates(filterUnnecessaryPredicates(partitionFilters), output), + QueryPlan.normalizePredicates( + filterUnusedDynamicPruningExpressions(partitionFilters), output), optionalBucketSet, optionalNumCoalescedBuckets, QueryPlan.normalizePredicates(dataFilters, output), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index 652dc46b0262c..098576d72f540 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -78,7 +78,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) } else if (onlyInBroadcast) { // it is not worthwhile to execute the query, so we fall-back to a true literal - Literal.TrueLiteral + DynamicPruningExpression(Literal.TrueLiteral) } else { // we need to apply an aggregate on the buildPlan in order to be column pruned val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 219d54c12cccc..8edfb91d15fff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1240,7 +1240,8 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("Unused Dynamic Pruning filter shouldn't affect canonicalization and exchange reuse") { + test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + + "canonicalization and exchange reuse") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql(