From e128a0a3d23b7a6a37cdc77034a651d79e32c451 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 12 Mar 2016 18:00:04 -0800 Subject: [PATCH 1/8] Disable Project Push Down Through Filter --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 +++ .../sql/catalyst/optimizer/ColumnPruningSuite.scala | 9 ++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 85776670e5c4e..cb3c284aa59d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -355,6 +355,9 @@ object ColumnPruning extends Rule[LogicalPlan] { case j @ Join(left, right, LeftSemi, condition) => j.copy(right = prunedChild(right, j.references)) + // Project should not be pushed below Filter. See PushPredicateThroughProject + case p @ Project(_, _: Filter) => p + // all the columns will be used to compare, so we can't prune them case p @ Project(_, _: SetOperation) => p case p @ Project(_, _: Distinct) => p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index dd7d65ddc9e96..a9130413bb2e1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -131,13 +131,12 @@ class ColumnPruningSuite extends PlanTest { comparePlans(optimized, expected) } - test("Column pruning on Filter") { + test("No column pruning on Filter") { val input = LocalRelation('a.int, 'b.string, 'c.double) val query = Project('a :: Nil, Filter('c > Literal(0.0), input)).analyze val expected = Project('a :: Nil, - Filter('c > Literal(0.0), - Project(Seq('a, 'c), input))).analyze + Filter('c > Literal(0.0), input)).analyze comparePlans(Optimize.execute(query), expected) } @@ -287,7 +286,7 @@ class ColumnPruningSuite extends PlanTest { AggregateExpression(Count('b), Complete, isDistinct = false), WindowSpecDefinition( 'a :: Nil, SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window)).where('window > 1).select('a, 'c) + UnspecifiedFrame)).as('window)).select('a, 'c, 'window) val correctAnswer = input.select('a, 'b, 'c) @@ -297,7 +296,7 @@ class ColumnPruningSuite extends PlanTest { SortOrder('b, Ascending) :: Nil, UnspecifiedFrame)).as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) - .select('a, 'c, 'window).where('window > 1).select('a, 'c).analyze + .select('a, 'c, 'window).analyze val optimized = Optimize.execute(originalQuery.analyze) From e5e00ae1d0f9885b05e3a81f8b084e1059151fba Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 13 Mar 2016 19:17:02 -0700 Subject: [PATCH 2/8] fixed test cases. --- .../scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index d76d0c44f51a0..1db0e3e0af3d5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -46,7 +46,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _)) => maybeRelation = Some(orcRelation) filters - }.flatten.reduceLeftOption(_ && _) + }.flatten.distinct.reduceLeftOption(_ && _) assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") val (_, selectedFilters) = From dd8c54273f3b4db2532aa86f095ea912b75661a1 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 13:58:16 -0700 Subject: [PATCH 3/8] split columnpruning to two rules --- .../sql/catalyst/optimizer/Optimizer.scala | 32 +++++++++++-------- .../optimizer/ColumnPruningSuite.scala | 8 +++-- .../optimizer/CombiningLimitsSuite.scala | 3 +- .../optimizer/JoinOptimizationSuite.scala | 1 + 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index cb3c284aa59d8..b8210629adf07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -71,6 +71,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { PushPredicateThroughAggregate, LimitPushDown, ColumnPruning, + EliminateObjects, // Operator combine CollapseRepartition, CollapseProject, @@ -315,10 +316,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { - private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = - output1.size == output2.size && - output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) - def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Prunes the unused columns from project list of Project/Aggregate/Expand case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => @@ -355,9 +352,6 @@ object ColumnPruning extends Rule[LogicalPlan] { case j @ Join(left, right, LeftSemi, condition) => j.copy(right = prunedChild(right, j.references)) - // Project should not be pushed below Filter. See PushPredicateThroughProject - case p @ Project(_, _: Filter) => p - // all the columns will be used to compare, so we can't prune them case p @ Project(_, _: SetOperation) => p case p @ Project(_, _: Distinct) => p @@ -383,12 +377,6 @@ object ColumnPruning extends Rule[LogicalPlan] { p.copy(child = w.copy( windowExpressions = w.windowExpressions.filter(p.references.contains))) - // Eliminate no-op Window - case w: Window if w.windowExpressions.isEmpty => w.child - - // Eliminate no-op Projects - case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child - // Can't prune the columns on LeafNode case p @ Project(_, l: LeafNode) => p @@ -412,6 +400,24 @@ object ColumnPruning extends Rule[LogicalPlan] { } } +/** + * Eliminate no-op Project and Window. + * + * Note: this rule should be executed just after ColumnPruning. + */ +object EliminateObjects extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + // Eliminate no-op Projects + case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child + // Eliminate no-op Window + case w: Window if w.windowExpressions.isEmpty => w.child + } + + private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = + output1.size == output2.size && + output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) +} + /** * Combines two adjacent [[Project]] operators into one and perform alias substitution, * merging the expressions into one single expression. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index a9130413bb2e1..35b31a60594f0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -35,6 +35,7 @@ class ColumnPruningSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Column pruning", FixedPoint(100), ColumnPruning, + EliminateObjects, CollapseProject) :: Nil } @@ -136,7 +137,8 @@ class ColumnPruningSuite extends PlanTest { val query = Project('a :: Nil, Filter('c > Literal(0.0), input)).analyze val expected = Project('a :: Nil, - Filter('c > Literal(0.0), input)).analyze + Filter('c > Literal(0.0), + Project(Seq('a, 'c), input))).analyze comparePlans(Optimize.execute(query), expected) } @@ -326,8 +328,8 @@ class ColumnPruningSuite extends PlanTest { val input2 = LocalRelation('c.int, 'd.string, 'e.double) val query = Project('b :: Nil, Union(input1 :: input2 :: Nil)).analyze - val expected = Project('b :: Nil, - Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze + val expected = + Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze comparePlans(Optimize.execute(query), expected) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 87ad81db11b64..57cab89e7254f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -28,7 +28,8 @@ class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Filter Pushdown", FixedPoint(100), - ColumnPruning) :: + ColumnPruning, + EliminateObjects) :: Batch("Combine Limit", FixedPoint(10), CombineLimits) :: Batch("Constant Folding", FixedPoint(10), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index e2f8146beee7b..3eaab124e6588 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -43,6 +43,7 @@ class JoinOptimizationSuite extends PlanTest { PushPredicateThroughGenerate, PushPredicateThroughAggregate, ColumnPruning, + EliminateObjects, CollapseProject) :: Nil } From 608b9014813174e2227389c7919074ed743fde47 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 14:36:04 -0700 Subject: [PATCH 4/8] fix stackoverflow --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b8210629adf07..20cfc80a09bd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -316,7 +316,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { // Prunes the unused columns from project list of Project/Aggregate/Expand case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains))) From c7c5f438d000c2b140951d230f00b4bfd083e418 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 14:44:14 -0700 Subject: [PATCH 5/8] revert the change in OrcFilterSuite back. --- .../scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index 1db0e3e0af3d5..d76d0c44f51a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -46,7 +46,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _)) => maybeRelation = Some(orcRelation) filters - }.flatten.distinct.reduceLeftOption(_ && _) + }.flatten.reduceLeftOption(_ && _) assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") val (_, selectedFilters) = From f1eee03aa8d745044f18fd97c2dca543c421497f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 14:45:50 -0700 Subject: [PATCH 6/8] change the test case name back --- .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 35b31a60594f0..063a703c55aba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -132,7 +132,7 @@ class ColumnPruningSuite extends PlanTest { comparePlans(optimized, expected) } - test("No column pruning on Filter") { + test("Column pruning on Filter") { val input = LocalRelation('a.int, 'b.string, 'c.double) val query = Project('a :: Nil, Filter('c > Literal(0.0), input)).analyze val expected = From adf64da3c3f86e6addb6cb813deba36bb4c7debf Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 14:50:03 -0700 Subject: [PATCH 7/8] revert the test case back --- .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 063a703c55aba..e45e9a6ca73e3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -288,7 +288,7 @@ class ColumnPruningSuite extends PlanTest { AggregateExpression(Count('b), Complete, isDistinct = false), WindowSpecDefinition( 'a :: Nil, SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window)).select('a, 'c, 'window) + UnspecifiedFrame)).as('window)).where('window > 1).select('a, 'c) val correctAnswer = input.select('a, 'b, 'c) @@ -298,7 +298,7 @@ class ColumnPruningSuite extends PlanTest { SortOrder('b, Ascending) :: Nil, UnspecifiedFrame)).as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) - .select('a, 'c, 'window).analyze + .select('a, 'c, 'window).where('window > 1).select('a, 'c).analyze val optimized = Optimize.execute(originalQuery.analyze) From bc4685aaabeeb4e58eb4aacccc4915d30349481e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 18:10:33 -0700 Subject: [PATCH 8/8] rename. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala | 2 +- .../spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala | 2 +- .../spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 20cfc80a09bd7..2de92d06ec836 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -71,7 +71,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { PushPredicateThroughAggregate, LimitPushDown, ColumnPruning, - EliminateObjects, + EliminateOperators, // Operator combine CollapseRepartition, CollapseProject, @@ -405,7 +405,7 @@ object ColumnPruning extends Rule[LogicalPlan] { * * Note: this rule should be executed just after ColumnPruning. */ -object EliminateObjects extends Rule[LogicalPlan] { +object EliminateOperators extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { // Eliminate no-op Projects case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index e45e9a6ca73e3..6187fb9e2fb87 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -35,7 +35,7 @@ class ColumnPruningSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Column pruning", FixedPoint(100), ColumnPruning, - EliminateObjects, + EliminateOperators, CollapseProject) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 57cab89e7254f..e0e9b6d93ec96 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -29,7 +29,7 @@ class CombiningLimitsSuite extends PlanTest { val batches = Batch("Filter Pushdown", FixedPoint(100), ColumnPruning, - EliminateObjects) :: + EliminateOperators) :: Batch("Combine Limit", FixedPoint(10), CombineLimits) :: Batch("Constant Folding", FixedPoint(10), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 3eaab124e6588..51468fa5ced31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -43,7 +43,7 @@ class JoinOptimizationSuite extends PlanTest { PushPredicateThroughGenerate, PushPredicateThroughAggregate, ColumnPruning, - EliminateObjects, + EliminateOperators, CollapseProject) :: Nil }