From dcc45841f44b652f680aea8057d5efd7379990e5 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 24 Jun 2014 13:32:46 +0800 Subject: [PATCH 1/4] Extract the joinkeys from join condition --- .../sql/catalyst/optimizer/Optimizer.scala | 7 +++- .../sql/catalyst/planning/patterns.scala | 37 +++++++++++++++++++ .../spark/sql/execution/SparkStrategies.scala | 11 +++--- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b20b5de8c46eb..fb517e40677ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -257,8 +257,11 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] { * Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details */ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { - // split the condition expression into 3 parts, - // (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide) + /** + * Splits join condition expressions into three categories based on the attributes required + * to evaluate them. + * @returns (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth) + */ private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = { val (leftEvaluateCondition, rest) = condition.partition(_.references subsetOf left.outputSet) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index a43bef389c4bf..7413f05c6fa2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -159,6 +159,43 @@ object HashFilteredJoin extends Logging with PredicateHelper { } } +/** + * A pattern that finds joins with equality conditions that can be evaluated using equi-join. + */ +object ExtractEquiJoinKeys extends Logging with PredicateHelper { + /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ + type ReturnType = + (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) + + def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { + case join @ Join(left, right, joinType, condition) => + logger.debug(s"Considering join on: $condition") + // Find equi-join predicates that can be evaluated before the join, and thus can be used + // as join keys. + val (joinPredicates, otherPredicates) = condition.map(splitConjunctivePredicates). + getOrElse(Nil).partition { + case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || + (canEvaluate(l, right) && canEvaluate(r, left)) => true + case _ => false + } + + val joinKeys = joinPredicates.map { + case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) + case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) + } + val leftKeys = joinKeys.map(_._1) + val rightKeys = joinKeys.map(_._2) + + if(leftKeys.length > 0) { + logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}") + Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) + } else { + None + } + case _ => None + } +} + /** * A pattern that collects all adjacent unions and returns their children as a Seq. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3cd29967d1cd5..fe005d1d39137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -31,9 +31,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find left semi joins where at least some predicates can be evaluated by matching hash - // keys using the HashFilteredJoin pattern. - case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) => + // Find left semi joins where at least some predicates can be evaluated by matching join keys + case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => val semiJoin = execution.LeftSemiJoinHash( leftKeys, rightKeys, planLater(left), planLater(right)) condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil @@ -65,7 +64,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case HashFilteredJoin( + case ExtractEquiJoinKeys( Inner, leftKeys, rightKeys, @@ -75,7 +74,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if broadcastTables.contains(b.tableName) => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - case HashFilteredJoin( + case ExtractEquiJoinKeys( Inner, leftKeys, rightKeys, @@ -85,7 +84,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if broadcastTables.contains(b.tableName) => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) - case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = execution.ShuffledHashJoin( leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) From cec34e883d8be966321059b9974e10fcedeed8e8 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 24 Jun 2014 15:36:31 +0800 Subject: [PATCH 2/4] Update the code style issues --- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 7413f05c6fa2d..cc8cd8cd8f59c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -172,8 +172,8 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { logger.debug(s"Considering join on: $condition") // Find equi-join predicates that can be evaluated before the join, and thus can be used // as join keys. - val (joinPredicates, otherPredicates) = condition.map(splitConjunctivePredicates). - getOrElse(Nil).partition { + val (joinPredicates, otherPredicates) = + condition.map(splitConjunctivePredicates).getOrElse(Nil).partition { case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || (canEvaluate(l, right) && canEvaluate(r, left)) => true case _ => false @@ -186,7 +186,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { val leftKeys = joinKeys.map(_._1) val rightKeys = joinKeys.map(_._2) - if(leftKeys.length > 0) { + if (leftKeys.length > 0) { logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}") Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) } else { From ceb4924c2f7bad1529a540e0f19ac5c181e5668e Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 26 Jun 2014 10:04:57 +0800 Subject: [PATCH 3/4] Remove the redundant pattern of join keys extraction --- .../sql/catalyst/planning/patterns.scala | 55 ------------------- 1 file changed, 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index cc8cd8cd8f59c..44b6286e181a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -104,61 +104,6 @@ object PhysicalOperation extends PredicateHelper { } } -/** - * A pattern that finds joins with equality conditions that can be evaluated using hashing - * techniques. For inner joins, any filters on top of the join operator are also matched. - */ -object HashFilteredJoin extends Logging with PredicateHelper { - /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ - type ReturnType = - (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) - - def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { - // All predicates can be evaluated for inner join (i.e., those that are in the ON - // clause and WHERE clause.) - case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) => - logger.debug(s"Considering hash inner join on: ${predicates ++ condition}") - splitPredicates(predicates ++ condition, join) - // All predicates can be evaluated for left semi join (those that are in the WHERE - // clause can only from left table, so they can all be pushed down.) - case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, condition)) => - logger.debug(s"Considering hash left semi join on: ${predicates ++ condition}") - splitPredicates(predicates ++ condition, join) - case join @ Join(left, right, joinType, condition) => - logger.debug(s"Considering hash join on: $condition") - splitPredicates(condition.toSeq, join) - case _ => None - } - - // Find equi-join predicates that can be evaluated before the join, and thus can be used - // as join keys. - def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = { - val Join(left, right, joinType, _) = join - val (joinPredicates, otherPredicates) = - allPredicates.flatMap(splitConjunctivePredicates).partition { - case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || - (canEvaluate(l, right) && canEvaluate(r, left)) => true - case _ => false - } - - val joinKeys = joinPredicates.map { - case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) - case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) - } - - // Do not consider this strategy if there are no join keys. - if (joinKeys.nonEmpty) { - val leftKeys = joinKeys.map(_._1) - val rightKeys = joinKeys.map(_._2) - - Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) - } else { - logger.debug(s"Avoiding hash join with no join keys.") - None - } - } -} - /** * A pattern that finds joins with equality conditions that can be evaluated using equi-join. */ From 4a1060aa09666e2d3961dcc635cff6e5915df210 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Fri, 27 Jun 2014 08:26:15 +0800 Subject: [PATCH 4/4] Fix some of the small issues --- .../scala/org/apache/spark/sql/catalyst/planning/patterns.scala | 2 +- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 44b6286e181a5..026692abe067d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -131,7 +131,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { val leftKeys = joinKeys.map(_._1) val rightKeys = joinKeys.map(_._2) - if (leftKeys.length > 0) { + if (joinKeys.nonEmpty) { logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}") Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index fe005d1d39137..0925605b7c4d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -45,7 +45,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } /** - * Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be + * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be * evaluated by matching hash keys. */ object HashJoin extends Strategy with PredicateHelper {