From cf6c7e91e2eb9331cf9123a446a14b71f625200a Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Thu, 17 Sep 2020 22:46:53 +0300 Subject: [PATCH 01/17] Bitwise operations are commutative --- .../sql/catalyst/expressions/Canonicalize.scala | 7 +++++++ .../catalyst/expressions/CanonicalizeSuite.scala | 16 ++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index a8031086d82f7..1ecf4372cfb58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -80,6 +80,13 @@ object Canonicalize { orderCommutative(a, { case And(l, r) if l.deterministic && r.deterministic => Seq(l, r)}) .reduce(And) + case o: BitwiseOr => + orderCommutative(o, { case BitwiseOr(l, r) => Seq(l, r) }).reduce(BitwiseOr) + case a: BitwiseAnd => + orderCommutative(a, { case BitwiseAnd(l, r) => Seq(l, r) }).reduce(BitwiseAnd) + case x: BitwiseXor => + orderCommutative(x, { case BitwiseXor(l, r) => Seq(l, r) }).reduce(BitwiseXor) + case EqualTo(l, r) if l.hashCode() > r.hashCode() => EqualTo(r, l) case EqualNullSafe(l, r) if l.hashCode() > r.hashCode() => EqualNullSafe(r, l) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala index a043b4cbed1f1..d822fe736ef89 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.util.TimeZone import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} @@ -95,4 +96,19 @@ class CanonicalizeSuite extends SparkFunSuite { val castWithTimeZoneId = Cast(literal, LongType, Some(TimeZone.getDefault.getID)) assert(castWithTimeZoneId.semanticEquals(cast)) } + + test("SPARK-32927: Bitwise operations are commutative") { + Seq( + (l: Expression, r: Expression) => BitwiseOr(l, r), + (l: Expression, r: Expression) => BitwiseAnd(l, r), + (l: Expression, r: Expression) => BitwiseXor(l, r) + ).foreach(f => { + val e1 = f('a, f('b, 'c)) + val e2 = f(f('a, 'b), 'c) + val e3 = f('a, f('b, 'a)) + + assert(e1.canonicalized == e2.canonicalized) + assert(e1.canonicalized != e3.canonicalized) + }) + } } From 317f3137aefa01f8eb2da753b147b1cb3d5ab7bf Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 08:02:34 +0300 Subject: [PATCH 02/17] Experiment with SQLQueryTestSuite --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 13 +++++++++++-- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b7791cd442694..91e37225d164e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -293,8 +293,17 @@ abstract class Optimizer(catalogManager: CatalogManager) * if necessary, instead of this method. */ final override def batches: Seq[Batch] = { - val excludedRulesConf = - SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq) + val excludedRulesConf = { + val excludedRulesConf = SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq) + if (excludedRulesConf.contains("*")) { + defaultBatches.flatMap { batch => + batch.rules.map(_.ruleName) + } + } else { + excludedRulesConf + } + } + val excludedRules = excludedRulesConf.filter { ruleName => val nonExcludable = nonExcludableRules.contains(ruleName) if (nonExcludable) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index b7cf0798a9d4b..0f992f32db001 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -335,7 +335,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper val configSets = configDims.values.foldLeft(Seq(Seq[(String, String)]())) { (res, dim) => dim.flatMap { configSet => res.map(_ ++ configSet) } - } + } ++ Seq(Seq(("spark.sql.optimizer.excludedRules", "*"))) configSets.foreach { configSet => try { From 281ed6880c872eda414c84c19ec047aff14c28a2 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 16:05:48 +0300 Subject: [PATCH 03/17] Optimizer rules --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 91e37225d164e..f414c046e134a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -252,7 +252,9 @@ abstract class Optimizer(catalogManager: CatalogManager) RewriteCorrelatedScalarSubquery.ruleName :: RewritePredicateSubquery.ruleName :: NormalizeFloatingNumbers.ruleName :: - ReplaceWithFieldsExpression.ruleName :: Nil + ReplaceWithFieldsExpression.ruleName :: + OptimizeSubqueries.ruleName :: + ConstantFolding.ruleName :: Nil /** * Optimize all the subqueries inside expression. @@ -298,6 +300,8 @@ abstract class Optimizer(catalogManager: CatalogManager) if (excludedRulesConf.contains("*")) { defaultBatches.flatMap { batch => batch.rules.map(_.ruleName) + }.filter { ruleName => + !nonExcludableRules.contains(ruleName) } } else { excludedRulesConf @@ -311,7 +315,8 @@ abstract class Optimizer(catalogManager: CatalogManager) s"because this rule is a non-excludable rule.") } !nonExcludable - } + }.toSet + if (excludedRules.isEmpty) { defaultBatches } else { From 326ec0553631be448e321d769186ef1dcac5625b Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 20:50:47 +0300 Subject: [PATCH 04/17] Collect, first and last should be deterministic aggregate functions --- .../spark/sql/catalyst/dsl/package.scala | 5 ++++ .../expressions/aggregate/First.scala | 3 --- .../catalyst/expressions/aggregate/Last.scala | 3 --- .../expressions/aggregate/collect.scala | 4 ---- .../optimizer/FilterPushdownSuite.scala | 23 +++++++++++++++++++ 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index b61c4b8d065f2..5a7805e4306e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -203,6 +203,11 @@ package object dsl { BitOrAgg(e).toAggregateExpression(isDistinct = false, filter = filter) def bitXor(e: Expression, filter: Option[Expression] = None): Expression = BitXorAgg(e).toAggregateExpression(isDistinct = false, filter = filter) + def collectList(e: Expression, filter: Option[Expression] = None): Expression = + CollectList(e).toAggregateExpression(isDistinct = false, filter = filter) + def collectSet(e: Expression, filter: Option[Expression] = None): Expression = + CollectSet(e).toAggregateExpression(isDistinct = false, filter = filter) + def upper(e: Expression): Expression = Upper(e) def lower(e: Expression): Expression = Lower(e) def coalesce(args: Expression*): Expression = Coalesce(args) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala index 65fd43c924d08..f785caed4d858 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala @@ -63,9 +63,6 @@ case class First(child: Expression, ignoreNulls: Boolean) override def nullable: Boolean = true - // First is not a deterministic function. - override lazy val deterministic: Boolean = false - // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala index 8d17a48a69f6f..368787a72d167 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala @@ -62,9 +62,6 @@ case class Last(child: Expression, ignoreNulls: Boolean) override def nullable: Boolean = true - // Last is not a deterministic function. - override lazy val deterministic: Boolean = false - // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 0a3d87623be8b..17461fe6b4cb1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -43,10 +43,6 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper override def dataType: DataType = ArrayType(child.dataType, false) - // Both `CollectList` and `CollectSet` are non-deterministic since their results depend on the - // actual order of input rows. - override lazy val deterministic: Boolean = false - protected def convertToBufferElement(value: Any): Any override def update(buffer: T, input: InternalRow): T = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 156313300eef9..5a0e99697c045 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -841,6 +841,29 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("SPARK-32940: aggregate: push filters through first, last and collect") { + Seq( + first(_: Expression), + last(_: Expression), + collectList(_: Expression), + collectSet(_: Expression) + ).foreach { agg => + val originalQuery = testRelation + .groupBy('a)(agg('b)) + .where('a > 42) + .analyze + + val optimized = Optimize.execute(originalQuery) + + val correctAnswer = testRelation + .where('a > 42) + .groupBy('a)(agg('b)) + .analyze + + comparePlans(optimized, correctAnswer) + } + } + test("union") { val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int) From ab2901eae7a8c1210352da63f3285d795ee05baa Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 21:00:35 +0300 Subject: [PATCH 05/17] Fix test, that required non-deterministic expression --- .../optimizer/UnwrapCastInBinaryComparisonSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala index 387964088b808..68f592ab58107 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.UnwrapCastInBinaryComparison._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, IntegerType} +import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, IntegerType, LongType} class UnwrapCastInBinaryComparisonSuite extends PlanTest with ExpressionEvalHelper { @@ -120,8 +120,8 @@ class UnwrapCastInBinaryComparisonSuite extends PlanTest with ExpressionEvalHelp } test("unwrap cast should skip when expression is non-deterministic") { - Seq(positiveInt, negativeInt).foreach (v => { - val e = Cast(First(f, ignoreNulls = true), IntegerType) <=> v + Seq(positiveLong, negativeLong).foreach (v => { + val e = Cast(SparkPartitionID(), LongType) <=> v assertEquivalent(e, e, evaluate = false) }) } From e8badd1b73316a5d4835ea962f2ae8195651cfc8 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 21:05:58 +0300 Subject: [PATCH 06/17] Revert "Optimizer rules" This reverts commit 281ed688 --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f414c046e134a..91e37225d164e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -252,9 +252,7 @@ abstract class Optimizer(catalogManager: CatalogManager) RewriteCorrelatedScalarSubquery.ruleName :: RewritePredicateSubquery.ruleName :: NormalizeFloatingNumbers.ruleName :: - ReplaceWithFieldsExpression.ruleName :: - OptimizeSubqueries.ruleName :: - ConstantFolding.ruleName :: Nil + ReplaceWithFieldsExpression.ruleName :: Nil /** * Optimize all the subqueries inside expression. @@ -300,8 +298,6 @@ abstract class Optimizer(catalogManager: CatalogManager) if (excludedRulesConf.contains("*")) { defaultBatches.flatMap { batch => batch.rules.map(_.ruleName) - }.filter { ruleName => - !nonExcludableRules.contains(ruleName) } } else { excludedRulesConf @@ -315,8 +311,7 @@ abstract class Optimizer(catalogManager: CatalogManager) s"because this rule is a non-excludable rule.") } !nonExcludable - }.toSet - + } if (excludedRules.isEmpty) { defaultBatches } else { From f1e6711c1d8c1b3b07841483c6874c69bc76cae9 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 21:06:49 +0300 Subject: [PATCH 07/17] Revert "Experiment with SQLQueryTestSuite" This reverts commit 317f3137 --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 13 ++----------- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 91e37225d164e..b7791cd442694 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -293,17 +293,8 @@ abstract class Optimizer(catalogManager: CatalogManager) * if necessary, instead of this method. */ final override def batches: Seq[Batch] = { - val excludedRulesConf = { - val excludedRulesConf = SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq) - if (excludedRulesConf.contains("*")) { - defaultBatches.flatMap { batch => - batch.rules.map(_.ruleName) - } - } else { - excludedRulesConf - } - } - + val excludedRulesConf = + SQLConf.get.optimizerExcludedRules.toSeq.flatMap(Utils.stringToSeq) val excludedRules = excludedRulesConf.filter { ruleName => val nonExcludable = nonExcludableRules.contains(ruleName) if (nonExcludable) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 0f992f32db001..b7cf0798a9d4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -335,7 +335,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper val configSets = configDims.values.foldLeft(Seq(Seq[(String, String)]())) { (res, dim) => dim.flatMap { configSet => res.map(_ ++ configSet) } - } ++ Seq(Seq(("spark.sql.optimizer.excludedRules", "*"))) + } configSets.foreach { configSet => try { From c17f2efef8130d9fde9bba1958be8a4012512cc9 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 21:07:10 +0300 Subject: [PATCH 08/17] Revert "Bitwise operations are commutative" This reverts commit cf6c7e91 --- .../sql/catalyst/expressions/Canonicalize.scala | 7 ------- .../catalyst/expressions/CanonicalizeSuite.scala | 16 ---------------- 2 files changed, 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index 1ecf4372cfb58..a8031086d82f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -80,13 +80,6 @@ object Canonicalize { orderCommutative(a, { case And(l, r) if l.deterministic && r.deterministic => Seq(l, r)}) .reduce(And) - case o: BitwiseOr => - orderCommutative(o, { case BitwiseOr(l, r) => Seq(l, r) }).reduce(BitwiseOr) - case a: BitwiseAnd => - orderCommutative(a, { case BitwiseAnd(l, r) => Seq(l, r) }).reduce(BitwiseAnd) - case x: BitwiseXor => - orderCommutative(x, { case BitwiseXor(l, r) => Seq(l, r) }).reduce(BitwiseXor) - case EqualTo(l, r) if l.hashCode() > r.hashCode() => EqualTo(r, l) case EqualNullSafe(l, r) if l.hashCode() > r.hashCode() => EqualNullSafe(r, l) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala index d822fe736ef89..a043b4cbed1f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions import java.util.TimeZone import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} @@ -96,19 +95,4 @@ class CanonicalizeSuite extends SparkFunSuite { val castWithTimeZoneId = Cast(literal, LongType, Some(TimeZone.getDefault.getID)) assert(castWithTimeZoneId.semanticEquals(cast)) } - - test("SPARK-32927: Bitwise operations are commutative") { - Seq( - (l: Expression, r: Expression) => BitwiseOr(l, r), - (l: Expression, r: Expression) => BitwiseAnd(l, r), - (l: Expression, r: Expression) => BitwiseXor(l, r) - ).foreach(f => { - val e1 = f('a, f('b, 'c)) - val e2 = f(f('a, 'b), 'c) - val e3 = f('a, f('b, 'a)) - - assert(e1.canonicalized == e2.canonicalized) - assert(e1.canonicalized != e3.canonicalized) - }) - } } From b9fd2f1f86b821271b4ac1460b5f7102bb7a132c Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 19 Sep 2020 21:27:32 +0300 Subject: [PATCH 09/17] Fix test, that required non-deterministic expression --- .../catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala index 8bcd61bdbaabf..7dfa7ba72a7f6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala @@ -123,7 +123,7 @@ class UnwrapCastInBinaryComparisonSuite extends PlanTest with ExpressionEvalHelp Seq(positiveLong, negativeLong).foreach (v => { val e = Cast(SparkPartitionID(), LongType) <=> v assertEquivalent(e, e, evaluate = false) - val e2 = Cast(Literal(30.toShort), IntegerType) >= v + val e2 = Cast(Literal(30), LongType) >= v assertEquivalent(e2, e2, evaluate = false) }) } From 9898c56f0bb57aaf276fe8b236009f379cbdc38c Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 20 Sep 2020 08:10:03 +0300 Subject: [PATCH 10/17] Fix test, that required non-deterministic aggregator --- .../org/apache/spark/sql/SQLQuerySuite.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b86df4db816b3..a2afc943862ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -30,13 +30,14 @@ import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedCo import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.aggregate._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.FunctionsCommand import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext} @@ -2864,12 +2865,20 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("Non-deterministic aggregate functions should not be deduplicated") { - val query = "SELECT a, first_value(b), first_value(b) + 1 FROM testData2 GROUP BY a" + spark.udf.register("countND", udaf(new Aggregator[Long, Long, Long] { + def zero: Long = 0L + def reduce(b: Long, a: Long): Long = b + a + def merge(b1: Long, b2: Long): Long = b1 + b2 + def finish(r: Long): Long = r + def bufferEncoder: Encoder[Long] = Encoders.scalaLong + def outputEncoder: Encoder[Long] = Encoders.scalaLong + }).asNondeterministic()) + + val query = "SELECT a, countND(b), countND(b) + 1 FROM testData2 GROUP BY a" val df = sql(query) val physical = df.queryExecution.sparkPlan val aggregateExpressions = physical.collectFirst { - case agg : HashAggregateExec => agg.aggregateExpressions - case agg : SortAggregateExec => agg.aggregateExpressions + case agg : BaseAggregateExec => agg.aggregateExpressions } assert (aggregateExpressions.isDefined) assert (aggregateExpressions.get.size == 2) From b0919a2b54c69d6028cbe0708574687cf6252ece Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 20 Sep 2020 08:32:22 +0300 Subject: [PATCH 11/17] Improve docstrings --- R/pkg/R/functions.R | 16 +++---- python/pyspark/sql/functions.py | 16 +++---- .../expressions/aggregate/First.scala | 4 +- .../catalyst/expressions/aggregate/Last.scala | 4 +- .../expressions/aggregate/collect.scala | 8 ++-- .../org/apache/spark/sql/functions.scala | 48 +++++++++---------- 6 files changed, 48 insertions(+), 48 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 5d9c8e8124d9a..8155d95bfce4d 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -934,8 +934,8 @@ setMethod("factorial", #' #' The function by default returns the first values it sees. It will return the first non-missing #' value it sees when na.rm is set to true. If all values are missing, then NA is returned. -#' Note: the function is non-deterministic because its results depends on the order of the rows -#' which may be non-deterministic after a shuffle. +#' Note: The function can be non-deterministic because its results depend on the order of +#' input rows which are usually non-deterministic after a shuffle. #' #' @param na.rm a logical value indicating whether NA values should be stripped #' before the computation proceeds. @@ -1079,8 +1079,8 @@ setMethod("kurtosis", #' #' The function by default returns the last values it sees. It will return the last non-missing #' value it sees when na.rm is set to true. If all values are missing, then NA is returned. -#' Note: the function is non-deterministic because its results depends on the order of the rows -#' which may be non-deterministic after a shuffle. +#' Note: The function can be non-deterministic because its results depend on the order of +#' input rows which are usually non-deterministic after a shuffle. #' #' @param x column to compute on. #' @param na.rm a logical value indicating whether NA values should be stripped @@ -4096,8 +4096,8 @@ setMethod("create_map", #' @details #' \code{collect_list}: Creates a list of objects with duplicates. -#' Note: the function is non-deterministic because the order of collected results depends -#' on the order of the rows which may be non-deterministic after a shuffle. +#' Note: The function can be non-deterministic because its results depend on the order of +#' input rows which are usually non-deterministic after a shuffle. #' #' @rdname column_aggregate_functions #' @aliases collect_list collect_list,Column-method @@ -4117,8 +4117,8 @@ setMethod("collect_list", #' @details #' \code{collect_set}: Creates a list of objects with duplicate elements eliminated. -#' Note: the function is non-deterministic because the order of collected results depends -#' on the order of the rows which may be non-deterministic after a shuffle. +#' Note: The function can be non-deterministic because its results depend on the order of +#' input rows which are usually non-deterministic after a shuffle. #' #' @rdname column_aggregate_functions #' @aliases collect_set collect_set,Column-method diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index f01bdb0165f8c..3bbbbf3911371 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -192,8 +192,8 @@ def _options_to_str(options): _collect_list_doc = """ Aggregate function: returns a list of objects with duplicates. - .. note:: The function is non-deterministic because the order of collected results depends - on the order of the rows which may be non-deterministic after a shuffle. + .. note:: The function can be non-deterministic because its results depend on the order of + input rows which are usually non-deterministic after a shuffle. >>> df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',)) >>> df2.agg(collect_list('age')).collect() @@ -202,8 +202,8 @@ def _options_to_str(options): _collect_set_doc = """ Aggregate function: returns a set of objects with duplicate elements eliminated. - .. note:: The function is non-deterministic because the order of collected results depends - on the order of the rows which may be non-deterministic after a shuffle. + .. note:: The function can be non-deterministic because its results depend on the order of + input rows which are usually non-deterministic after a shuffle. >>> df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',)) >>> df2.agg(collect_set('age')).collect() @@ -454,8 +454,8 @@ def first(col, ignorenulls=False): The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - .. note:: The function is non-deterministic because its results depends on the order of the - rows which may be non-deterministic after a shuffle. + .. note:: The function can be non-deterministic because its results depend on the order of + input rows which are usually non-deterministic after a shuffle. """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.first(_to_java_column(col), ignorenulls) @@ -545,8 +545,8 @@ def last(col, ignorenulls=False): The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - .. note:: The function is non-deterministic because its results depends on the order of the - rows which may be non-deterministic after a shuffle. + .. note:: The function can be non-deterministic because its results depend on the order of + input rows which are usually non-deterministic after a shuffle. """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.last(_to_java_column(col), ignorenulls) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala index f785caed4d858..fc8c9990ab639 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala @@ -45,8 +45,8 @@ import org.apache.spark.sql.types._ 5 """, note = """ - The function is non-deterministic because its results depends on the order of the rows - which may be non-deterministic after a shuffle. + The function can be non-deterministic because its results depend on the order of input rows + which are usually non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala index 368787a72d167..91049affcd13e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala @@ -44,8 +44,8 @@ import org.apache.spark.sql.types._ 5 """, note = """ - The function is non-deterministic because its results depends on the order of the rows - which may be non-deterministic after a shuffle. + The function can be non-deterministic because its results depend on the order of input rows + which are usually non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 17461fe6b4cb1..ef84c77019e30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -90,8 +90,8 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper [1,2,1] """, note = """ - The function is non-deterministic because the order of collected results depends - on the order of the rows which may be non-deterministic after a shuffle. + The function can be non-deterministic because its results depend on the order of input rows + which are usually non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") @@ -132,8 +132,8 @@ case class CollectList( [1,2] """, note = """ - The function is non-deterministic because the order of collected results depends - on the order of the rows which may be non-deterministic after a shuffle. + The function can be non-deterministic because its results depend on the order of input rows + which are usually non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index b20e8c241ef9d..a61660e269ee5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -302,8 +302,8 @@ object functions { /** * Aggregate function: returns a list of objects with duplicates. * - * @note The function is non-deterministic because the order of collected results depends - * on the order of the rows which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -313,8 +313,8 @@ object functions { /** * Aggregate function: returns a list of objects with duplicates. * - * @note The function is non-deterministic because the order of collected results depends - * on the order of the rows which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -324,8 +324,8 @@ object functions { /** * Aggregate function: returns a set of objects with duplicate elements eliminated. * - * @note The function is non-deterministic because the order of collected results depends - * on the order of the rows which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -335,8 +335,8 @@ object functions { /** * Aggregate function: returns a set of objects with duplicate elements eliminated. * - * @note The function is non-deterministic because the order of collected results depends - * on the order of the rows which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -454,8 +454,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -470,8 +470,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -486,8 +486,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 @@ -500,8 +500,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 @@ -579,8 +579,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -595,8 +595,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -611,8 +611,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 @@ -625,8 +625,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function is non-deterministic because its results depends on the order of the rows - * which may be non-deterministic after a shuffle. + * @note The function can be non-deterministic because its results depend on the order of + * input rows which are usually non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 From dc6e7c06060a8eeb13dfaf8d55dcde415e07e1e6 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Thu, 31 Dec 2020 00:10:59 +0200 Subject: [PATCH 12/17] Fix merge --- .../catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala index ecf3e1fc18cc9..07da99b58dba6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils._ -import org.apache.spark.sql.catalyst.expressions.aggregate.First import org.apache.spark.sql.catalyst.optimizer.UnwrapCastInBinaryComparison._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ From 422925192900fe7956d17ab23f1c56d81951c78c Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 20 Mar 2021 07:35:02 +0200 Subject: [PATCH 13/17] Revert doc changes --- R/pkg/R/functions.R | 16 +++---- python/pyspark/sql/functions.py | 16 +++---- .../expressions/aggregate/First.scala | 6 ++- .../catalyst/expressions/aggregate/Last.scala | 6 ++- .../expressions/aggregate/collect.scala | 10 ++-- .../org/apache/spark/sql/functions.scala | 48 +++++++++---------- 6 files changed, 54 insertions(+), 48 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 28567a32af9b0..ff20c9696099c 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1104,8 +1104,8 @@ setMethod("factorial", #' #' The function by default returns the first values it sees. It will return the first non-missing #' value it sees when na.rm is set to true. If all values are missing, then NA is returned. -#' Note: The function can be non-deterministic because its results depend on the order of -#' input rows which are usually non-deterministic after a shuffle. +#' Note: the function is non-deterministic because its results depends on the order of the rows +#' which may be non-deterministic after a shuffle. #' #' @param na.rm a logical value indicating whether NA values should be stripped #' before the computation proceeds. @@ -1249,8 +1249,8 @@ setMethod("kurtosis", #' #' The function by default returns the last values it sees. It will return the last non-missing #' value it sees when na.rm is set to true. If all values are missing, then NA is returned. -#' Note: The function can be non-deterministic because its results depend on the order of -#' input rows which are usually non-deterministic after a shuffle. +#' Note: the function is non-deterministic because its results depends on the order of the rows +#' which may be non-deterministic after a shuffle. #' #' @param x column to compute on. #' @param na.rm a logical value indicating whether NA values should be stripped @@ -4378,8 +4378,8 @@ setMethod("create_map", #' @details #' \code{collect_list}: Creates a list of objects with duplicates. -#' Note: The function can be non-deterministic because its results depend on the order of -#' input rows which are usually non-deterministic after a shuffle. +#' Note: the function is non-deterministic because the order of collected results depends +#' on the order of the rows which may be non-deterministic after a shuffle. #' #' @rdname column_aggregate_functions #' @aliases collect_list collect_list,Column-method @@ -4399,8 +4399,8 @@ setMethod("collect_list", #' @details #' \code{collect_set}: Creates a list of objects with duplicate elements eliminated. -#' Note: The function can be non-deterministic because its results depend on the order of -#' input rows which are usually non-deterministic after a shuffle. +#' Note: the function is non-deterministic because the order of collected results depends +#' on the order of the rows which may be non-deterministic after a shuffle. #' #' @rdname column_aggregate_functions #' @aliases collect_set collect_set,Column-method diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index c5c308e176247..9a5977b7025e6 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -673,8 +673,8 @@ def collect_list(col): Notes ----- - The function can be non-deterministic because its results depend on the order of - input rows which are usually non-deterministic after a shuffle. + The function is non-deterministic because the order of collected results depends + on the order of the rows which may be non-deterministic after a shuffle. Examples -------- @@ -693,8 +693,8 @@ def collect_set(col): Notes ----- - The function can be non-deterministic because its results depend on the order of - input rows which are usually non-deterministic after a shuffle. + The function is non-deterministic because the order of collected results depends + on the order of the rows which may be non-deterministic after a shuffle. Examples -------- @@ -1018,8 +1018,8 @@ def first(col, ignorenulls=False): Notes ----- - The function can be non-deterministic because its results depend on the order of - input rows which are usually non-deterministic after a shuffle. + The function is non-deterministic because its results depends on the order of the + rows which may be non-deterministic after a shuffle. """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.first(_to_java_column(col), ignorenulls) @@ -1126,8 +1126,8 @@ def last(col, ignorenulls=False): Notes ----- - The function can be non-deterministic because its results depend on the order of - input rows which are usually non-deterministic after a shuffle. + The function is non-deterministic because its results depends on the order of the + rows which may be non-deterministic after a shuffle. """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.last(_to_java_column(col), ignorenulls) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala index fc8c9990ab639..e8b400d9b6561 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala @@ -45,8 +45,8 @@ import org.apache.spark.sql.types._ 5 """, note = """ - The function can be non-deterministic because its results depend on the order of input rows - which are usually non-deterministic after a shuffle. + The function is non-deterministic because its results depends on the order of the rows + which may be non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") @@ -63,6 +63,8 @@ case class First(child: Expression, ignoreNulls: Boolean) override def nullable: Boolean = true + override lazy val deterministic: Boolean = true + // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala index 91049affcd13e..4fe59d4d90a39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala @@ -44,8 +44,8 @@ import org.apache.spark.sql.types._ 5 """, note = """ - The function can be non-deterministic because its results depend on the order of input rows - which are usually non-deterministic after a shuffle. + The function is non-deterministic because its results depends on the order of the rows + which may be non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") @@ -62,6 +62,8 @@ case class Last(child: Expression, ignoreNulls: Boolean) override def nullable: Boolean = true + override lazy val deterministic: Boolean = true + // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 9065a909b16ac..4c70e9b51722d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -43,6 +43,8 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper override def dataType: DataType = ArrayType(child.dataType, false) + override lazy val deterministic: Boolean = true + protected def convertToBufferElement(value: Any): Any override def update(buffer: T, input: InternalRow): T = { @@ -90,8 +92,8 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper [1,2,1] """, note = """ - The function can be non-deterministic because its results depend on the order of input rows - which are usually non-deterministic after a shuffle. + The function is non-deterministic because the order of collected results depends + on the order of the rows which may be non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") @@ -132,8 +134,8 @@ case class CollectList( [1,2] """, note = """ - The function can be non-deterministic because its results depend on the order of input rows - which are usually non-deterministic after a shuffle. + The function is non-deterministic because the order of collected results depends + on the order of the rows which may be non-deterministic after a shuffle. """, group = "agg_funcs", since = "2.0.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 5d5f657150cb9..2c4b81c5df908 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -311,8 +311,8 @@ object functions { /** * Aggregate function: returns a list of objects with duplicates. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because the order of collected results depends + * on the order of the rows which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -322,8 +322,8 @@ object functions { /** * Aggregate function: returns a list of objects with duplicates. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because the order of collected results depends + * on the order of the rows which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -333,8 +333,8 @@ object functions { /** * Aggregate function: returns a set of objects with duplicate elements eliminated. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because the order of collected results depends + * on the order of the rows which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -344,8 +344,8 @@ object functions { /** * Aggregate function: returns a set of objects with duplicate elements eliminated. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because the order of collected results depends + * on the order of the rows which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.6.0 @@ -476,8 +476,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -492,8 +492,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -508,8 +508,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 @@ -522,8 +522,8 @@ object functions { * The function by default returns the first values it sees. It will return the first non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 @@ -601,8 +601,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -617,8 +617,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 2.0.0 @@ -633,8 +633,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 @@ -647,8 +647,8 @@ object functions { * The function by default returns the last values it sees. It will return the last non-null * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. * - * @note The function can be non-deterministic because its results depend on the order of - * input rows which are usually non-deterministic after a shuffle. + * @note The function is non-deterministic because its results depends on the order of the rows + * which may be non-deterministic after a shuffle. * * @group agg_funcs * @since 1.3.0 From 13c92f34ea69ef767822ac8dd3875efb46589ad6 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 21 Mar 2021 14:52:02 +0200 Subject: [PATCH 14/17] Remove distinct from first and last --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 ++ .../sql/catalyst/optimizer/EliminateDistinctSuite.scala | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3e3550d5da89b..42384c6e3ed7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -364,6 +364,8 @@ object EliminateDistinct extends Rule[LogicalPlan] { case _: BitAndAgg => true case _: BitOrAgg => true case _: CollectSet => true + case _: First => true + case _: Last => true case _ => false } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 0848d5609ff02..3666fc928cfc3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -39,11 +39,15 @@ class EliminateDistinctSuite extends PlanTest { Min(_), BitAndAgg(_), BitOrAgg(_), + First(_, ignoreNulls = true), + First(_, ignoreNulls = false), + Last(_, ignoreNulls = true), + Last(_, ignoreNulls = false), CollectSet(_: Expression) ).foreach { aggBuilder => val agg = aggBuilder('a) - test(s"Eliminate Distinct in ${agg.prettyName}") { + test(s"Eliminate Distinct in ${agg.toString}") { val query = testRelation .select(agg.toAggregateExpression(isDistinct = true).as('result)) .analyze From 3f945f048fa35fa77dd04ea62500f42730845871 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Tue, 13 Apr 2021 22:25:43 +0300 Subject: [PATCH 15/17] Remove the deterministic flag --- .../apache/spark/sql/catalyst/expressions/aggregate/First.scala | 2 -- .../apache/spark/sql/catalyst/expressions/aggregate/Last.scala | 2 -- .../spark/sql/catalyst/expressions/aggregate/collect.scala | 2 -- 3 files changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala index e8b400d9b6561..f785caed4d858 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala @@ -63,8 +63,6 @@ case class First(child: Expression, ignoreNulls: Boolean) override def nullable: Boolean = true - override lazy val deterministic: Boolean = true - // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala index 4fe59d4d90a39..368787a72d167 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala @@ -62,8 +62,6 @@ case class Last(child: Expression, ignoreNulls: Boolean) override def nullable: Boolean = true - override lazy val deterministic: Boolean = true - // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 4c70e9b51722d..ff3ed97655c2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -43,8 +43,6 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper override def dataType: DataType = ArrayType(child.dataType, false) - override lazy val deterministic: Boolean = true - protected def convertToBufferElement(value: Any): Any override def update(buffer: T, input: InternalRow): T = { From e5e9a041f519723bb16fd4cce01c8a0e3ce5f054 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Thu, 24 Jun 2021 13:49:27 +0300 Subject: [PATCH 16/17] Use withUserDefinedFunction --- .../org/apache/spark/sql/SQLQuerySuite.scala | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c354647cbae97..97e49a4a94e23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2786,23 +2786,25 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("Non-deterministic aggregate functions should not be deduplicated") { - spark.udf.register("countND", udaf(new Aggregator[Long, Long, Long] { - def zero: Long = 0L - def reduce(b: Long, a: Long): Long = b + a - def merge(b1: Long, b2: Long): Long = b1 + b2 - def finish(r: Long): Long = r - def bufferEncoder: Encoder[Long] = Encoders.scalaLong - def outputEncoder: Encoder[Long] = Encoders.scalaLong - }).asNondeterministic()) - - val query = "SELECT a, countND(b), countND(b) + 1 FROM testData2 GROUP BY a" - val df = sql(query) - val physical = df.queryExecution.sparkPlan - val aggregateExpressions = physical.collectFirst { - case agg : BaseAggregateExec => agg.aggregateExpressions + withUserDefinedFunction("sumND" -> true) { + spark.udf.register("sumND", udaf(new Aggregator[Long, Long, Long] { + def zero: Long = 0L + def reduce(b: Long, a: Long): Long = b + a + def merge(b1: Long, b2: Long): Long = b1 + b2 + def finish(r: Long): Long = r + def bufferEncoder: Encoder[Long] = Encoders.scalaLong + def outputEncoder: Encoder[Long] = Encoders.scalaLong + }).asNondeterministic()) + + val query = "SELECT a, sumND(b), sumND(b) + 1 FROM testData2 GROUP BY a" + val df = sql(query) + val physical = df.queryExecution.sparkPlan + val aggregateExpressions = physical.collectFirst { + case agg: BaseAggregateExec => agg.aggregateExpressions + } + assert(aggregateExpressions.isDefined) + assert(aggregateExpressions.get.size == 2) } - assert (aggregateExpressions.isDefined) - assert (aggregateExpressions.get.size == 2) } test("SPARK-22356: overlapped columns between data and partition schema in data source tables") { From e4ed57c3b819bca2476940b8c6fc2fcfc0c1bb3a Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Fri, 5 Nov 2021 08:54:49 +0200 Subject: [PATCH 17/17] Address comments --- .../spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala | 2 +- .../catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 3666fc928cfc3..08773720d717b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -47,7 +47,7 @@ class EliminateDistinctSuite extends PlanTest { ).foreach { aggBuilder => val agg = aggBuilder('a) - test(s"Eliminate Distinct in ${agg.toString}") { + test(s"Eliminate Distinct in $agg") { val query = testRelation .select(agg.toAggregateExpression(isDistinct = true).as('result)) .analyze diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala index 7cd9ee8dc3c55..bdbb51bf31c83 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala @@ -183,7 +183,7 @@ class UnwrapCastInBinaryComparisonSuite extends PlanTest with ExpressionEvalHelp test("unwrap cast should skip when expression is non-deterministic or foldable") { Seq(positiveLong, negativeLong).foreach (v => { - val e = Cast(SparkPartitionID(), LongType) <=> v + val e = Cast(Rand(0), LongType) <=> v assertEquivalent(e, e, evaluate = false) val e2 = Cast(Literal(30), LongType) >= v assertEquivalent(e2, e2, evaluate = false)