From 568dcd9523ce35cbbe4ad3748a753a9fda950791 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 1 Sep 2020 13:04:24 +0000 Subject: [PATCH] [SPARK-32761][SQL] Allow aggregating multiple foldable distinct expressions ### What changes were proposed in this pull request? For queries with multiple foldable distinct columns, since they will be eliminated during execution, it's not mandatory to let `RewriteDistinctAggregates` handle this case. And in the current code, `RewriteDistinctAggregates` *dose* miss some "aggregating with multiple foldable distinct expressions" cases. For example: `select count(distinct 2), count(distinct 2, 3)` will be missed. But in the planner, this will trigger an error that "multiple distinct expressions" are not allowed. As the foldable distinct columns can be eliminated finally, we can allow this in the aggregation planner check. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added test case Closes #29607 from linhongliu-db/SPARK-32761. Authored-by: Linhong Liu Signed-off-by: Wenchen Fan (cherry picked from commit a410658c9bc244e325702dc926075bd835b669ff) --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 ++++-- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 4 ++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f836debcbafd..689d1eb62ffa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -517,7 +517,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val (functionsWithDistinct, functionsWithoutDistinct) = aggregateExpressions.partition(_.isDistinct) - if (functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length > 1) { + if (functionsWithDistinct.map( + _.aggregateFunction.children.filterNot(_.foldable).toSet).distinct.length > 1) { // This is a sanity check. We should not reach here when we have multiple distinct // column sets. Our `RewriteDistinctAggregates` should take care this case. sys.error("You hit a query analyzer bug. Please report your query to " + @@ -548,7 +549,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // to be [COUNT(DISTINCT foo), MAX(DISTINCT foo)], but // [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed because those two distinct // aggregates have different column expressions. - val distinctExpressions = functionsWithDistinct.head.aggregateFunction.children + val distinctExpressions = + functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable) val normalizedNamedDistinctExpressions = distinctExpressions.map { e => // Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here // because `distinctExpressions` is not extracted during logical phase. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 7869005ea2de..85cbe458aebd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2467,6 +2467,10 @@ class DataFrameSuite extends QueryTest val df = l.join(r, $"col2" === $"col4", "LeftOuter") checkAnswer(df, Row("2", "2")) } + + test("SPARK-32761: aggregating multiple distinct CONSTANT columns") { + checkAnswer(sql("select count(distinct 2), count(distinct 2,3)"), Row(1, 1)) + } } case class GroupByKey(a: Int, b: Int)