Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)

joinType match {
case Inner | LeftExistence(_) =>
case Inner | LeftSemi | ExistenceJoin(_) =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
Expand All @@ -1306,14 +1306,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, RightOuter, newJoinCond)
case LeftOuter =>
case LeftOuter | LeftAnti =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, LeftOuter, newJoinCond)
Join(newLeft, newRight, joinType, newJoinCond)
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}

test("joins: push down where clause into left anti join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery =
x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
.where("x.a".attr > 10)
.analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
x.where("x.a".attr > 10)
.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}

test("joins: only push down join conditions to the right of a left anti join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery =
x.join(y,
LeftAnti,
Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr > 10)).analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
x.join(
y.where("y.a".attr > 10),
LeftAnti,
Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}


val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))

test("generate: predicate referenced no generated column") {
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2);
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);

SELECT *
FROM tbl_a
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2);
29 changes: 29 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 3


-- !query 0
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
SELECT *
FROM tbl_a
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2)
-- !query 2 schema
struct<c1:int,c2:int>
-- !query 2 output
2 1
3 6