From b0877705072dadc1d16da061db2b226cc47b2165 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Fri, 22 May 2015 15:50:46 +0200 Subject: [PATCH 1/5] Refactor out QueryExecution, SparkPlanner * process in a lighter-weight, backwards-compatible way --- .../org/apache/spark/sql/DataFrame.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 268 +++++++++--------- .../spark/sql/execution/SparkStrategies.scala | 4 +- 3 files changed, 144 insertions(+), 130 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3ec1c4a2f1027..e85ebb4ddc2cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -118,7 +118,7 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, - @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) + @DeveloperApi @transient val queryExecution: QueryExecution) extends RDDApi[Row] with Serializable { /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1ea596dddff02..be922e5288ace 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -166,9 +166,9 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) - protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): org.apache.spark.sql.QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan) + protected[sql] def executePlan(plan: LogicalPlan) = new org.apache.spark.sql.QueryExecution(this, plan) @transient protected[sql] val tlSession = new ThreadLocal[SQLSession]() { @@ -784,77 +784,12 @@ class SQLContext(@transient val sparkContext: SparkContext) }.toArray } - protected[sql] class SparkPlanner extends SparkStrategies { - val sparkContext: SparkContext = self.sparkContext + @deprecated("use org.apache.spark.sql.SparkPlanner") + protected[sql] class SparkPlanner extends org.apache.spark.sql.SparkPlanner(this) - val sqlContext: SQLContext = self - - def codegenEnabled: Boolean = self.conf.codegenEnabled - - def unsafeEnabled: Boolean = self.conf.unsafeEnabled - - def numPartitions: Int = self.conf.numShufflePartitions - - def strategies: Seq[Strategy] = - experimental.extraStrategies ++ ( - DataSourceStrategy :: - DDLStrategy :: - TakeOrdered :: - HashAggregation :: - LeftSemiJoin :: - HashJoin :: - InMemoryScans :: - ParquetOperations :: - BasicOperators :: - CartesianProduct :: - BroadcastNestedLoopJoin :: Nil) - - /** - * Used to build table scan operators where complex projection and filtering are done using - * separate physical operators. This function returns the given scan operator with Project and - * Filter nodes added only when needed. For example, a Project operator is only used when the - * final desired output requires complex expressions to be evaluated or when columns can be - * further eliminated out after filtering has been done. - * - * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized - * away by the filter pushdown optimization. - * - * The required attributes for both filtering and expression evaluation are passed to the - * provided `scanBuilder` function so that it can avoid unnecessary column materialization. - */ - def pruneFilterProject( - projectList: Seq[NamedExpression], - filterPredicates: Seq[Expression], - prunePushedDownFilters: Seq[Expression] => Seq[Expression], - scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { - - val projectSet = AttributeSet(projectList.flatMap(_.references)) - val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) - val filterCondition = - prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) - - // Right now we still use a projection even if the only evaluation is applying an alias - // to a column. Since this is a no-op, it could be avoided. However, using this - // optimization with the current implementation would change the output schema. - // TODO: Decouple final output schema from expression evaluation so this copy can be - // avoided safely. - - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && - filterSet.subsetOf(projectSet)) { - // When it is possible to just use column pruning to get the right projection and - // when the columns of this projection are enough to evaluate all filter conditions, - // just do a scan followed by a filter, with no extra project. - val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) - filterCondition.map(Filter(_, scan)).getOrElse(scan) - } else { - val scan = scanBuilder((projectSet ++ filterSet).toSeq) - Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) - } - } - } @transient - protected[sql] val planner = new SparkPlanner + protected[sql] val planner: org.apache.spark.sql.SparkPlanner = new org.apache.spark.sql.SparkPlanner(this) @transient protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1) @@ -893,63 +828,8 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val conf: SQLConf = new SQLConf } - /** - * :: DeveloperApi :: - * The primary workflow for executing relational queries using Spark. Designed to allow easy - * access to the intermediate phases of query execution for developers. - */ - @DeveloperApi - protected[sql] class QueryExecution(val logical: LogicalPlan) { - def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) - - lazy val analyzed: LogicalPlan = analyzer.execute(logical) - lazy val withCachedData: LogicalPlan = { - assertAnalyzed() - cacheManager.useCachedData(analyzed) - } - lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) - - // TODO: Don't just pick the first one... - lazy val sparkPlan: SparkPlan = { - SparkPlan.currentContext.set(self) - planner.plan(optimizedPlan).next() - } - // executedPlan should not be used to initialize any SparkPlan. It should be - // only used for execution. - lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) - - /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[Row] = executedPlan.execute() - - protected def stringOrError[A](f: => A): String = - try f.toString catch { case e: Throwable => e.toString } - - def simpleString: String = - s"""== Physical Plan == - |${stringOrError(executedPlan)} - """.stripMargin.trim - - override def toString: String = { - def output = - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") - - // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) - // however, the `toRdd` will cause the real execution, which is not what we want. - // We need to think about how to avoid the side effect. - s"""== Parsed Logical Plan == - |${stringOrError(logical)} - |== Analyzed Logical Plan == - |${stringOrError(output)} - |${stringOrError(analyzed)} - |== Optimized Logical Plan == - |${stringOrError(optimizedPlan)} - |== Physical Plan == - |${stringOrError(executedPlan)} - |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} - |== RDD == - """.stripMargin.trim - } - } + @deprecated("use org.apache.spark.sql.QueryExecution") + protected[sql] class QueryExecution(logical: LogicalPlan) extends org.apache.spark.sql.QueryExecution(this, logical) /** * Parses the data type in our internal string representation. The data type string should @@ -1321,3 +1201,137 @@ object SQLContext { } } } + + + +/** + * :: DeveloperApi :: + * The primary workflow for executing relational queries using Spark. Designed to allow easy + * access to the intermediate phases of query execution for developers. + */ +@DeveloperApi +class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { + val analyzer = sqlContext.analyzer + val optimizer = sqlContext.optimizer + val planner = sqlContext.planner + val cacheManager = sqlContext.cacheManager + val prepareForExecution = sqlContext.prepareForExecution + + def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) + + lazy val analyzed: LogicalPlan = analyzer.execute(logical) + lazy val withCachedData: LogicalPlan = { + assertAnalyzed() + cacheManager.useCachedData(analyzed) + } + lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) + + // TODO: Don't just pick the first one... + lazy val sparkPlan: SparkPlan = { + SparkPlan.currentContext.set(sqlContext) + planner.plan(optimizedPlan).next() + } + // executedPlan should not be used to initialize any SparkPlan. It should be + // only used for execution. + lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) + + /** Internal version of the RDD. Avoids copies and has no schema */ + lazy val toRdd: RDD[Row] = executedPlan.execute() + + protected def stringOrError[A](f: => A): String = + try f.toString catch { case e: Throwable => e.toString } + + def simpleString: String = + s"""== Physical Plan == + |${stringOrError(executedPlan)} + """.stripMargin.trim + + override def toString: String = { + def output = + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") + + // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) + // however, the `toRdd` will cause the real execution, which is not what we want. + // We need to think about how to avoid the side effect. + s"""== Parsed Logical Plan == + |${stringOrError(logical)} + |== Analyzed Logical Plan == + |${stringOrError(output)} + |${stringOrError(analyzed)} + |== Optimized Logical Plan == + |${stringOrError(optimizedPlan)} + |== Physical Plan == + |${stringOrError(executedPlan)} + |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} + |== RDD == + """.stripMargin.trim + } +} + + +class SparkPlanner(val sqlContext: SQLContext) extends org.apache.spark.sql.execution.SparkStrategies { + val sparkContext: SparkContext = sqlContext.sparkContext + + def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled + + def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled + + def numPartitions: Int = sqlContext.conf.numShufflePartitions + + def strategies: Seq[Strategy] = + sqlContext.experimental.extraStrategies ++ ( + DataSourceStrategy :: + DDLStrategy :: + TakeOrdered :: + HashAggregation :: + LeftSemiJoin :: + HashJoin :: + InMemoryScans :: + ParquetOperations :: + BasicOperators :: + CartesianProduct :: + BroadcastNestedLoopJoin :: Nil) + + /** + * Used to build table scan operators where complex projection and filtering are done using + * separate physical operators. This function returns the given scan operator with Project and + * Filter nodes added only when needed. For example, a Project operator is only used when the + * final desired output requires complex expressions to be evaluated or when columns can be + * further eliminated out after filtering has been done. + * + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. + * + * The required attributes for both filtering and expression evaluation are passed to the + * provided `scanBuilder` function so that it can avoid unnecessary column materialization. + */ + def pruneFilterProject( + projectList: Seq[NamedExpression], + filterPredicates: Seq[Expression], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], + scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { + + val projectSet = AttributeSet(projectList.flatMap(_.references)) + val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) + val filterCondition = + prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) + + // Right now we still use a projection even if the only evaluation is applying an alias + // to a column. Since this is a no-op, it could be avoided. However, using this + // optimization with the current implementation would change the output schema. + // TODO: Decouple final output schema from expression evaluation so this copy can be + // avoided safely. + + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && + filterSet.subsetOf(projectSet)) { + // When it is possible to just use column pruning to get the right projection and + // when the columns of this projection are enough to evaluate all filter conditions, + // just do a scan followed by a filter, with no extra project. + val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) + filterCondition.map(Filter(_, scan)).getOrElse(scan) + } else { + val scan = scanBuilder((projectSet ++ filterSet).toSeq) + Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) + } + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3f6a0345bc17d..4b96df974a24f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,10 +27,10 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman import org.apache.spark.sql.parquet._ import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{SQLContext, Strategy, execution} +import org.apache.spark.sql.{SparkPlanner, SQLContext, Strategy, execution} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { - self: SQLContext#SparkPlanner => + self: SparkPlanner => object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { From c22aa54d18db7efa6b054cd364594e66b81a25ee Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Mon, 25 May 2015 10:02:44 +0200 Subject: [PATCH 2/5] Move QueryExecution, SparkPlanner to execution package --- .../org/apache/spark/sql/DataFrame.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 147 +----------------- .../spark/sql/execution/QueryExecution.scala | 87 +++++++++++ .../spark/sql/execution/SparkPlanner.scala | 92 +++++++++++ .../spark/sql/execution/SparkStrategies.scala | 2 +- 5 files changed, 188 insertions(+), 142 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index e85ebb4ddc2cb..199f2ec76ddb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} -import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} +import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, QueryExecution} import org.apache.spark.sql.json.JacksonGenerator import org.apache.spark.sql.sources.CreateTableUsingAsSelect import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index be922e5288ace..e806407a44d20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -41,7 +41,8 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.ParserDialect -import org.apache.spark.sql.execution.{Filter, _} +import org.apache.spark.sql.{execution => sparkexecution} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -166,9 +167,9 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) - protected[sql] def executeSql(sql: String): org.apache.spark.sql.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): sparkexecution.QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = new org.apache.spark.sql.QueryExecution(this, plan) + protected[sql] def executePlan(plan: LogicalPlan) = new sparkexecution.QueryExecution(this, plan) @transient protected[sql] val tlSession = new ThreadLocal[SQLSession]() { @@ -785,11 +786,11 @@ class SQLContext(@transient val sparkContext: SparkContext) } @deprecated("use org.apache.spark.sql.SparkPlanner") - protected[sql] class SparkPlanner extends org.apache.spark.sql.SparkPlanner(this) + protected[sql] class SparkPlanner extends sparkexecution.SparkPlanner(this) @transient - protected[sql] val planner: org.apache.spark.sql.SparkPlanner = new org.apache.spark.sql.SparkPlanner(this) + protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this) @transient protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1) @@ -829,7 +830,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } @deprecated("use org.apache.spark.sql.QueryExecution") - protected[sql] class QueryExecution(logical: LogicalPlan) extends org.apache.spark.sql.QueryExecution(this, logical) + protected[sql] class QueryExecution(logical: LogicalPlan) extends sparkexecution.QueryExecution(this, logical) /** * Parses the data type in our internal string representation. The data type string should @@ -1201,137 +1202,3 @@ object SQLContext { } } } - - - -/** - * :: DeveloperApi :: - * The primary workflow for executing relational queries using Spark. Designed to allow easy - * access to the intermediate phases of query execution for developers. - */ -@DeveloperApi -class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { - val analyzer = sqlContext.analyzer - val optimizer = sqlContext.optimizer - val planner = sqlContext.planner - val cacheManager = sqlContext.cacheManager - val prepareForExecution = sqlContext.prepareForExecution - - def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) - - lazy val analyzed: LogicalPlan = analyzer.execute(logical) - lazy val withCachedData: LogicalPlan = { - assertAnalyzed() - cacheManager.useCachedData(analyzed) - } - lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) - - // TODO: Don't just pick the first one... - lazy val sparkPlan: SparkPlan = { - SparkPlan.currentContext.set(sqlContext) - planner.plan(optimizedPlan).next() - } - // executedPlan should not be used to initialize any SparkPlan. It should be - // only used for execution. - lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) - - /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[Row] = executedPlan.execute() - - protected def stringOrError[A](f: => A): String = - try f.toString catch { case e: Throwable => e.toString } - - def simpleString: String = - s"""== Physical Plan == - |${stringOrError(executedPlan)} - """.stripMargin.trim - - override def toString: String = { - def output = - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") - - // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) - // however, the `toRdd` will cause the real execution, which is not what we want. - // We need to think about how to avoid the side effect. - s"""== Parsed Logical Plan == - |${stringOrError(logical)} - |== Analyzed Logical Plan == - |${stringOrError(output)} - |${stringOrError(analyzed)} - |== Optimized Logical Plan == - |${stringOrError(optimizedPlan)} - |== Physical Plan == - |${stringOrError(executedPlan)} - |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} - |== RDD == - """.stripMargin.trim - } -} - - -class SparkPlanner(val sqlContext: SQLContext) extends org.apache.spark.sql.execution.SparkStrategies { - val sparkContext: SparkContext = sqlContext.sparkContext - - def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled - - def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled - - def numPartitions: Int = sqlContext.conf.numShufflePartitions - - def strategies: Seq[Strategy] = - sqlContext.experimental.extraStrategies ++ ( - DataSourceStrategy :: - DDLStrategy :: - TakeOrdered :: - HashAggregation :: - LeftSemiJoin :: - HashJoin :: - InMemoryScans :: - ParquetOperations :: - BasicOperators :: - CartesianProduct :: - BroadcastNestedLoopJoin :: Nil) - - /** - * Used to build table scan operators where complex projection and filtering are done using - * separate physical operators. This function returns the given scan operator with Project and - * Filter nodes added only when needed. For example, a Project operator is only used when the - * final desired output requires complex expressions to be evaluated or when columns can be - * further eliminated out after filtering has been done. - * - * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized - * away by the filter pushdown optimization. - * - * The required attributes for both filtering and expression evaluation are passed to the - * provided `scanBuilder` function so that it can avoid unnecessary column materialization. - */ - def pruneFilterProject( - projectList: Seq[NamedExpression], - filterPredicates: Seq[Expression], - prunePushedDownFilters: Seq[Expression] => Seq[Expression], - scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { - - val projectSet = AttributeSet(projectList.flatMap(_.references)) - val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) - val filterCondition = - prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) - - // Right now we still use a projection even if the only evaluation is applying an alias - // to a column. Since this is a no-op, it could be avoided. However, using this - // optimization with the current implementation would change the output schema. - // TODO: Decouple final output schema from expression evaluation so this copy can be - // avoided safely. - - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && - filterSet.subsetOf(projectSet)) { - // When it is possible to just use column pruning to get the right projection and - // when the columns of this projection are enough to evaluate all filter conditions, - // just do a scan followed by a filter, with no extra project. - val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) - filterCondition.map(Filter(_, scan)).getOrElse(scan) - } else { - val scan = scanBuilder((projectSet ++ filterSet).toSeq) - Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) - } - } -} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala new file mode 100644 index 0000000000000..ec61f4a252b92 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.{Experimental, DeveloperApi} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, Row} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * :: Experimental :: + * The primary workflow for executing relational queries using Spark. Designed to allow easy + * access to the intermediate phases of query execution for developers. + */ +@Experimental +class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { + val analyzer = sqlContext.analyzer + val optimizer = sqlContext.optimizer + val planner = sqlContext.planner + val cacheManager = sqlContext.cacheManager + val prepareForExecution = sqlContext.prepareForExecution + + def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) + + lazy val analyzed: LogicalPlan = analyzer.execute(logical) + lazy val withCachedData: LogicalPlan = { + assertAnalyzed() + cacheManager.useCachedData(analyzed) + } + lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) + + // TODO: Don't just pick the first one... + lazy val sparkPlan: SparkPlan = { + SparkPlan.currentContext.set(sqlContext) + planner.plan(optimizedPlan).next() + } + // executedPlan should not be used to initialize any SparkPlan. It should be + // only used for execution. + lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) + + /** Internal version of the RDD. Avoids copies and has no schema */ + lazy val toRdd: RDD[Row] = executedPlan.execute() + + protected def stringOrError[A](f: => A): String = + try f.toString catch { case e: Throwable => e.toString } + + def simpleString: String = + s"""== Physical Plan == + |${stringOrError(executedPlan)} + """.stripMargin.trim + + override def toString: String = { + def output = + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") + + // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) + // however, the `toRdd` will cause the real execution, which is not what we want. + // We need to think about how to avoid the side effect. + s"""== Parsed Logical Plan == + |${stringOrError(logical)} + |== Analyzed Logical Plan == + |${stringOrError(output)} + |${stringOrError(analyzed)} + |== Optimized Logical Plan == + |${stringOrError(optimizedPlan)} + |== Physical Plan == + |${stringOrError(executedPlan)} + |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} + |== RDD == + """.stripMargin.trim + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala new file mode 100644 index 0000000000000..16134c44496e6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.sources.DataSourceStrategy + +@Experimental +class SparkPlanner(val sqlContext: SQLContext) extends org.apache.spark.sql.execution.SparkStrategies { + val sparkContext: SparkContext = sqlContext.sparkContext + + def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled + + def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled + + def numPartitions: Int = sqlContext.conf.numShufflePartitions + + def strategies: Seq[Strategy] = + sqlContext.experimental.extraStrategies ++ ( + DataSourceStrategy :: + DDLStrategy :: + TakeOrdered :: + HashAggregation :: + LeftSemiJoin :: + HashJoin :: + InMemoryScans :: + ParquetOperations :: + BasicOperators :: + CartesianProduct :: + BroadcastNestedLoopJoin :: Nil) + + /** + * Used to build table scan operators where complex projection and filtering are done using + * separate physical operators. This function returns the given scan operator with Project and + * Filter nodes added only when needed. For example, a Project operator is only used when the + * final desired output requires complex expressions to be evaluated or when columns can be + * further eliminated out after filtering has been done. + * + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. + * + * The required attributes for both filtering and expression evaluation are passed to the + * provided `scanBuilder` function so that it can avoid unnecessary column materialization. + */ + def pruneFilterProject( + projectList: Seq[NamedExpression], + filterPredicates: Seq[Expression], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], + scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { + + val projectSet = AttributeSet(projectList.flatMap(_.references)) + val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) + val filterCondition = + prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) + + // Right now we still use a projection even if the only evaluation is applying an alias + // to a column. Since this is a no-op, it could be avoided. However, using this + // optimization with the current implementation would change the output schema. + // TODO: Decouple final output schema from expression evaluation so this copy can be + // avoided safely. + + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && + filterSet.subsetOf(projectSet)) { + // When it is possible to just use column pruning to get the right projection and + // when the columns of this projection are enough to evaluate all filter conditions, + // just do a scan followed by a filter, with no extra project. + val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]]) + filterCondition.map(Filter(_, scan)).getOrElse(scan) + } else { + val scan = scanBuilder((projectSet ++ filterSet).toSeq) + Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) + } + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4b96df974a24f..4218337926bbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman import org.apache.spark.sql.parquet._ import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{SparkPlanner, SQLContext, Strategy, execution} +import org.apache.spark.sql.{SQLContext, Strategy, execution} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => From 253c15e1b7ee8762ffb74f118fbf478197b21664 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 21 Jul 2015 09:33:14 +0200 Subject: [PATCH 3/5] Fix scala style --- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 9 ++++++--- .../org/apache/spark/sql/execution/QueryExecution.scala | 6 +++--- .../org/apache/spark/sql/execution/SparkPlanner.scala | 5 ++--- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f8e61d9c12b26..a22ac503a5d0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -185,9 +185,11 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) - protected[sql] def executeSql(sql: String): sparkexecution.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): sparkexecution.QueryExecution = + executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = new sparkexecution.QueryExecution(this, plan) + protected[sql] def executePlan(plan: LogicalPlan) = + new sparkexecution.QueryExecution(this, plan) @transient protected[sql] val tlSession = new ThreadLocal[SQLSession]() { @@ -900,7 +902,8 @@ class SQLContext(@transient val sparkContext: SparkContext) } @deprecated("use org.apache.spark.sql.QueryExecution") - protected[sql] class QueryExecution(logical: LogicalPlan) extends sparkexecution.QueryExecution(this, logical) + protected[sql] class QueryExecution(logical: LogicalPlan) + extends sparkexecution.QueryExecution(this, logical) /** * Parses the data type in our internal string representation. The data type string should diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b652f29de682d..4197b28b1ff2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan */ @Experimental class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { - val analyzer = sqlContext.analyzer + val analyzer = sqlContext.analyzer val optimizer = sqlContext.optimizer - val planner = sqlContext.planner + val planner = sqlContext.planner val cacheManager = sqlContext.cacheManager val prepareForExecution = sqlContext.prepareForExecution @@ -85,4 +85,4 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { |== RDD == """.stripMargin.trim } -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 81f2b32d21fde..afb30ec3a6e76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -21,11 +21,10 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.TakeOrderedAndProject import org.apache.spark.sql.sources.DataSourceStrategy @Experimental -class SparkPlanner(val sqlContext: SQLContext) extends org.apache.spark.sql.execution.SparkStrategies { +class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies { val sparkContext: SparkContext = sqlContext.sparkContext def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled @@ -90,4 +89,4 @@ class SparkPlanner(val sqlContext: SQLContext) extends org.apache.spark.sql.exec Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) } } -} \ No newline at end of file +} From 47685ccb20170de9d77bcc431491a52f62cfa7cb Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Thu, 10 Sep 2015 09:02:36 +0200 Subject: [PATCH 4/5] Update SQLContext.scala --- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a1206164d3484..76083616469e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -192,8 +192,8 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) - protected[sql] def executeSql(sql: String): sparkexecution.QueryExecution = - executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): + org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql)) protected[sql] def executePlan(plan: LogicalPlan) = new sparkexecution.QueryExecution(this, plan) @@ -787,7 +787,7 @@ class SQLContext(@transient val sparkContext: SparkContext) }.toArray } - @deprecated("use org.apache.spark.sql.SparkPlanner", "1.5.0") + @deprecated("use org.apache.spark.sql.SparkPlanner", "1.6.0") protected[sql] class SparkPlanner extends sparkexecution.SparkPlanner(this) @transient @@ -838,7 +838,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val conf: SQLConf = new SQLConf } - @deprecated("use org.apache.spark.sql.QueryExecution", "1.5.0") + @deprecated("use org.apache.spark.sql.QueryExecution", "1.6.0") protected[sql] class QueryExecution(logical: LogicalPlan) extends sparkexecution.QueryExecution(this, logical) From 22a01ae0fb9ee3c1b974acf3cbdca3abaf5e227e Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Thu, 10 Sep 2015 19:08:06 +0200 Subject: [PATCH 5/5] Scala style --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 76083616469e6..e3fdd782e6ff6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -192,7 +192,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) - protected[sql] def executeSql(sql: String): + protected[sql] def executeSql(sql: String): org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql)) protected[sql] def executePlan(plan: LogicalPlan) =