From 3e18ade2de5c6a6afab8d383c958d0bfcb00d66f Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Wed, 14 Apr 2021 17:15:03 -0700 Subject: [PATCH 1/7] fix --- .../sql/catalyst/analysis/CheckAnalysis.scala | 10 +++++++-- .../org/apache/spark/sql/SubquerySuite.scala | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 76724e7bbdb76..d22a22cb5430b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -950,9 +950,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case f: Filter => val (correlated, _) = splitConjunctivePredicates(f.condition).partition(containsOuter) - // Find any non-equality correlated predicates + // Find any non-equality correlated predicates and equality predicates that do not + // guarantee one-on-one mapping between inner and outer attributes. E.G: + // a = outer(c) -> true + // a > outer(c) -> false + // a + b = outer(c) -> false (because there can be multiple combinations of a, b that + // satisfy the condition) foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists { - case _: EqualTo | _: EqualNullSafe => false + case Equality(_: Attribute, b) => b.find(_.isInstanceOf[Attribute]).isDefined + case Equality(a, _: Attribute) => a.find(_.isInstanceOf[Attribute]).isDefined case _ => true } failOnInvalidOuterReference(f) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index e30ca0cf309ce..854331f5066d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1827,4 +1827,25 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Row(0, 1, 1) :: Row(1, 2, null) :: Nil) } } + + test("SPARK-35080: Unsupported correlated equality predicates in subquery") { + withTempView("t1", "t2") { + Seq(("a"), "b").toDF("c").createOrReplaceTempView("t1") + Seq("ab", "abc", "bc").toDF("c").createOrReplaceTempView("t2") + val e1 = intercept[AnalysisException] { + sql("select c, (select count(*) from t2 where t1.c = substring(t2.c, 1, 1)) from t1") + } + assert(e1.getMessage.contains( + "Correlated column is not allowed in a non-equality predicate")) + } + withTempView("t1", "t2") { + Seq((6)).toDF("c").createOrReplaceTempView("t1") + Seq((0, 6), (1, 5), (2, 4), (3, 3)).toDF("a", "b").createOrReplaceTempView("t2") + val e2 = intercept[AnalysisException] { + sql("select c, (select count(*) from t2 where a + b = c) from t1") + } + assert(e2.getMessage.contains( + "Correlated column is not allowed in a non-equality predicate")) + } + } } From dec271d59b1f70df01009d42fd5af6f0323b964b Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Wed, 14 Apr 2021 17:44:48 -0700 Subject: [PATCH 2/7] move test --- .../analysis/AnalysisErrorSuite.scala | 15 +++++++++++++ .../org/apache/spark/sql/SubquerySuite.scala | 21 ------------------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index aecbf241e3947..eba01fd010972 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -767,4 +767,19 @@ class AnalysisErrorSuite extends AnalysisTest { "using ordinal position or wrap it in first() (or first_value) if you don't care " + "which value you get." :: Nil) } + + test("SPARK-35080: Unsupported correlated equality predicates in subquery") { + val a = AttributeReference("a", IntegerType)() + val b = AttributeReference("b", IntegerType)() + val c = AttributeReference("c", IntegerType)() + val t1 = LocalRelation(a, b) + val t2 = LocalRelation(c) + val plan = Project( + ScalarSubquery( + Aggregate(Nil, count(Literal(1)).as("cnt") :: Nil, + Filter($"a" + $"b" === $"c", t1)) + ).as("sub") :: Nil, + t2) + assertAnalysisError(plan, "Correlated column is not allowed in a non-equality predicate" :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 854331f5066d8..e30ca0cf309ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1827,25 +1827,4 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Row(0, 1, 1) :: Row(1, 2, null) :: Nil) } } - - test("SPARK-35080: Unsupported correlated equality predicates in subquery") { - withTempView("t1", "t2") { - Seq(("a"), "b").toDF("c").createOrReplaceTempView("t1") - Seq("ab", "abc", "bc").toDF("c").createOrReplaceTempView("t2") - val e1 = intercept[AnalysisException] { - sql("select c, (select count(*) from t2 where t1.c = substring(t2.c, 1, 1)) from t1") - } - assert(e1.getMessage.contains( - "Correlated column is not allowed in a non-equality predicate")) - } - withTempView("t1", "t2") { - Seq((6)).toDF("c").createOrReplaceTempView("t1") - Seq((0, 6), (1, 5), (2, 4), (3, 3)).toDF("a", "b").createOrReplaceTempView("t2") - val e2 = intercept[AnalysisException] { - sql("select c, (select count(*) from t2 where a + b = c) from t1") - } - assert(e2.getMessage.contains( - "Correlated column is not allowed in a non-equality predicate")) - } - } } From b8bf47f2e6d0713a8077d970b8f08f54500828ae Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Wed, 14 Apr 2021 23:23:40 -0700 Subject: [PATCH 3/7] address comments and update tests --- .../sql/catalyst/analysis/CheckAnalysis.scala | 76 ++++++++++++++----- .../analysis/AnalysisErrorSuite.scala | 23 ++++-- .../sql-tests/results/udf/udf-except.sql.out | 13 +++- .../org/apache/spark/sql/SubquerySuite.scala | 2 +- 4 files changed, 86 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d22a22cb5430b..7e74bd9396965 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -899,14 +899,65 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // +- SubqueryAlias t1, `t1` // +- Project [_1#73 AS c1#76, _2#74 AS c2#77] // +- LocalRelation [_1#73, _2#74] - def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan): Unit = { - if (found) { + // SPARK-35080: The same issue can happen to correlated equality predicates when + // they do not guarantee one-to-one mapping between inner and outer attributes. + // For example: + // Table: + // t1(a, b): [(0, 6), (1, 5), (2, 4)] + // t2(c): [(6)] + // + // Query: + // SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2 + // + // Original subquery plan: + // Aggregate [count(1)] + // +- Filter ((a + b) = outer(c)) + // +- LocalRelation [a, b] + // + // Plan after pulling up correlated predicates: + // Aggregate [a, b] [count(1), a, b] + // +- LocalRelation [a, b] + // + // Plan after rewrite: + // Project [c1, count(1)] + // +- Join LeftOuter ((a + b) = c) + // :- LocalRelation [c] + // +- Aggregate [a, b] [count(1), a, b] + // +- LocalRelation [a, b] + // + // The right hand side of the join transformed from the subquery will output + // count(1) | a | b + // 1 | 0 | 6 + // 1 | 1 | 5 + // 1 | 2 | 4 + // and the plan after rewrite will give the original query incorrect results. + def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = { + if (predicates.nonEmpty) { // Report a non-supported case as an exception - failAnalysis(s"Correlated column is not allowed in a non-equality predicate:\n$p") + failAnalysis(s"Correlated column is not allowed in predicate " + + s"${predicates.map(_.sql).mkString}:\n$p") } } - var foundNonEqualCorrelatedPred: Boolean = false + // Given a correlated predicate, check if it is either a non-equality predicate or + // equality predicate that does not guarantee one-on-one mapping between inner and + // outer attributes. E.G.: + // (a = outer(c)) -> false + // (a > outer(c)) -> true + // (a + b = outer(c)) -> true + // The last one is true because there can be multiple combinations of (a, b) that + // satisfy the equality condition. For example, if outer(c) = 0, then both (0, 0) + // and (-1, 1) can make the predicate evaluate to true. + def isUnsupportedPredicate(condition: Expression): Boolean = condition match { + // Only allow equality condition with one side being an attribute and another + // side being an expression without attributes from the inner query. Note + // OuterReference is a leaf node and will not be found here. + case Equality(_: Attribute, b) => b.find(_.isInstanceOf[Attribute]).isDefined + case Equality(a, _: Attribute) => a.find(_.isInstanceOf[Attribute]).isDefined + case _ => true + } + + val unsupportedPredicates = mutable.ArrayBuffer.empty[Expression] // Simplify the predicates before validating any unsupported correlation patterns in the plan. AnalysisHelper.allowInvokingTransformsInAnalyzer { BooleanSimplification(sub).foreachUp { @@ -949,28 +1000,17 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // The other operator is Join. Filter can be anywhere in a correlated subquery. case f: Filter => val (correlated, _) = splitConjunctivePredicates(f.condition).partition(containsOuter) - - // Find any non-equality correlated predicates and equality predicates that do not - // guarantee one-on-one mapping between inner and outer attributes. E.G: - // a = outer(c) -> true - // a > outer(c) -> false - // a + b = outer(c) -> false (because there can be multiple combinations of a, b that - // satisfy the condition) - foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || correlated.exists { - case Equality(_: Attribute, b) => b.find(_.isInstanceOf[Attribute]).isDefined - case Equality(a, _: Attribute) => a.find(_.isInstanceOf[Attribute]).isDefined - case _ => true - } + unsupportedPredicates ++= correlated.filter(isUnsupportedPredicate) failOnInvalidOuterReference(f) // Aggregate cannot host any correlated expressions // It can be on a correlation path if the correlation contains - // only equality correlated predicates. + // only supported correlated equality predicates. // It cannot be on a correlation path if the correlation has // non-equality correlated predicates. case a: Aggregate => failOnInvalidOuterReference(a) - failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a) + failOnUnsupportedCorrelatedPredicate(unsupportedPredicates, a) // Join can host correlated expressions. case j @ Join(left, right, joinType, _, _) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index eba01fd010972..8ea84a484d570 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -774,12 +774,21 @@ class AnalysisErrorSuite extends AnalysisTest { val c = AttributeReference("c", IntegerType)() val t1 = LocalRelation(a, b) val t2 = LocalRelation(c) - val plan = Project( - ScalarSubquery( - Aggregate(Nil, count(Literal(1)).as("cnt") :: Nil, - Filter($"a" + $"b" === $"c", t1)) - ).as("sub") :: Nil, - t2) - assertAnalysisError(plan, "Correlated column is not allowed in a non-equality predicate" :: Nil) + val conditions = Seq( + (abs($"a") === $"c", "abs(a) = outer(c)"), + (abs($"a") <=> $"c", "abs(a) <=> outer(c)"), + ($"a" + 1 === $"c", "(a + 1) = outer(c)"), + ($"a" + $"b" === $"c", "(a + b) = outer(c)"), + ($"a" + $"c" === $"b", "(a + outer(c)) = b"), + (And($"a" === $"c", Cast($"a", IntegerType) === $"c"), "CAST(a AS INT) = outer(c)")) + conditions.foreach { case (cond, msg) => + val plan = Project( + ScalarSubquery( + Aggregate(Nil, count(Literal(1)).as("cnt") :: Nil, + Filter(cond, t1)) + ).as("sub") :: Nil, + t2) + assertAnalysisError(plan, s"Correlated column is not allowed in predicate ($msg)" :: Nil) + } } } diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-except.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-except.sql.out index 0a5958b2a7694..7d21715fbaa8a 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-except.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-except.sql.out @@ -100,6 +100,15 @@ WHERE udf(t1.v) >= (SELECT min(udf(t2.v)) FROM t2 WHERE t2.k = t1.k) -- !query schema -struct +struct<> -- !query output -two +org.apache.spark.sql.AnalysisException +Correlated column is not allowed in predicate (CAST(udf(cast(k as string)) AS STRING) = CAST(udf(cast(outer(k#x) as string)) AS STRING)): +Aggregate [cast(udf(cast(max(cast(udf(cast(v#x as string)) as int)) as string)) as int) AS udf(max(udf(v)))#x] ++- Filter (cast(udf(cast(k#x as string)) as string) = cast(udf(cast(outer(k#x) as string)) as string)) + +- SubqueryAlias t2 + +- View (`t2`, [k#x,v#x]) + +- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x] + +- Project [k#x, v#x] + +- SubqueryAlias t2 + +- LocalRelation [k#x, v#x] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index e30ca0cf309ce..5991acdfd00e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -554,7 +554,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark sql("select a, (select sum(b) from l l2 where l2.a < l1.a) sum_b from l l1") } assert(msg1.getMessage.contains( - "Correlated column is not allowed in a non-equality predicate:")) + "Correlated column is not allowed in predicate (l2.a < outer(l1.a))")) } test("disjunctive correlated scalar subquery") { From 2ad654834d404d3a89acc88548f58db5948be893 Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Thu, 15 Apr 2021 11:21:55 -0700 Subject: [PATCH 4/7] update a special case --- .../sql/catalyst/analysis/CheckAnalysis.scala | 14 ++++++++++---- .../scala/org/apache/spark/sql/SubquerySuite.scala | 11 +++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7e74bd9396965..1011983cbd086 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -939,10 +939,16 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } } + def containsAttribute(e: Expression): Boolean = { + e.find(_.isInstanceOf[Attribute]).isDefined + } + // Given a correlated predicate, check if it is either a non-equality predicate or // equality predicate that does not guarantee one-on-one mapping between inner and - // outer attributes. E.G.: + // outer attributes. When the correlated predicate does not contain any attribute + // (i.e. only has outer references), it is supported and should return false. E.G.: // (a = outer(c)) -> false + // (outer(c) = outer(d)) -> false // (a > outer(c)) -> true // (a + b = outer(c)) -> true // The last one is true because there can be multiple combinations of (a, b) that @@ -952,9 +958,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // Only allow equality condition with one side being an attribute and another // side being an expression without attributes from the inner query. Note // OuterReference is a leaf node and will not be found here. - case Equality(_: Attribute, b) => b.find(_.isInstanceOf[Attribute]).isDefined - case Equality(a, _: Attribute) => a.find(_.isInstanceOf[Attribute]).isDefined - case _ => true + case Equality(_: Attribute, b) => containsAttribute(b) + case Equality(a, _: Attribute) => containsAttribute(a) + case o => containsAttribute(o) } val unsupportedPredicates = mutable.ArrayBuffer.empty[Expression] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 5991acdfd00e8..b6be78f503536 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1827,4 +1827,15 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Row(0, 1, 1) :: Row(1, 2, null) :: Nil) } } + + test("SPARK-35080: correlated predicates in subquery contain only outer references") { + // Equality + checkAnswer( + sql("select c, d, (select count(*) from l where c + 1 = d) from t"), + Row(2, 3.0, 8) :: Row(2, 3.0, 8) :: Row(3, 2.0, 0) :: Row(4, 1.0, 0) :: Nil) + // Non-equality + checkAnswer( + sql("select c, d, (select count(*) from l where c > d) from t"), + Row(2, 3.0, 0) :: Row(2, 3.0, 0) :: Row(3, 2.0, 8) :: Row(4, 1.0, 8) :: Nil) + } } From 03f44876007f11ea6b5c0440995594018f0b2f5d Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Fri, 16 Apr 2021 15:27:16 -0700 Subject: [PATCH 5/7] rebase & update --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 3 ++- .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 7 +------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 1011983cbd086..bcc3fa325574e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -960,7 +960,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // OuterReference is a leaf node and will not be found here. case Equality(_: Attribute, b) => containsAttribute(b) case Equality(a, _: Attribute) => containsAttribute(a) - case o => containsAttribute(o) + case e @ Equality(_, _) => containsAttribute(e) + case _ => true } val unsupportedPredicates = mutable.ArrayBuffer.empty[Expression] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index b6be78f503536..a1c2b9a4a5668 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1828,14 +1828,9 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } - test("SPARK-35080: correlated predicates in subquery contain only outer references") { - // Equality + test("SPARK-35080: correlated equality predicates contain only outer references") { checkAnswer( sql("select c, d, (select count(*) from l where c + 1 = d) from t"), Row(2, 3.0, 8) :: Row(2, 3.0, 8) :: Row(3, 2.0, 0) :: Row(4, 1.0, 0) :: Nil) - // Non-equality - checkAnswer( - sql("select c, d, (select count(*) from l where c > d) from t"), - Row(2, 3.0, 0) :: Row(2, 3.0, 0) :: Row(3, 2.0, 8) :: Row(4, 1.0, 8) :: Nil) } } From 652b854de761ce915118c92859e8ad07cc29e8da Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 19 Apr 2021 13:58:11 +0800 Subject: [PATCH 6/7] Update CheckAnalysis.scala --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index bcc3fa325574e..62567cc2bda60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -934,7 +934,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = { if (predicates.nonEmpty) { // Report a non-supported case as an exception - failAnalysis(s"Correlated column is not allowed in predicate " + + failAnalysis("Correlated column is not allowed in predicate " + s"${predicates.map(_.sql).mkString}:\n$p") } } @@ -1017,7 +1017,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // non-equality correlated predicates. case a: Aggregate => failOnInvalidOuterReference(a) - failOnUnsupportedCorrelatedPredicate(unsupportedPredicates, a) + failOnUnsupportedCorrelatedPredicate(unsupportedPredicates.toSeq, a) // Join can host correlated expressions. case j @ Join(left, right, joinType, _, _) => From 84f91b23578ac4db525911820022a6db30f92adc Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Mon, 19 Apr 2021 15:09:11 -0700 Subject: [PATCH 7/7] update test --- .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index a1c2b9a4a5668..bb6b402e8156d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1829,8 +1829,11 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("SPARK-35080: correlated equality predicates contain only outer references") { - checkAnswer( - sql("select c, d, (select count(*) from l where c + 1 = d) from t"), - Row(2, 3.0, 8) :: Row(2, 3.0, 8) :: Row(3, 2.0, 0) :: Row(4, 1.0, 0) :: Nil) + withTempView("t") { + Seq((0, 1), (1, 1)).toDF("c1", "c2").createOrReplaceTempView("t") + checkAnswer( + sql("select c1, c2, (select count(*) from l where c1 = c2) from t"), + Row(0, 1, 0) :: Row(1, 1, 8) :: Nil) + } } }