Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ class Analyzer(catalog: Catalog,
UnresolvedHavingClauseAttributes ::
TrimGroupingAliases ::
typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Remove SubQueries", fixedPoint,
EliminateSubQueries)
extendedResolutionRules : _*)
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}

import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
Expand Down Expand Up @@ -289,7 +289,7 @@ package object dsl {
InsertIntoTable(
analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)

def analyze = analysis.SimpleAnalyzer(logicalPlan)
def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
}

object plans { // scalastyle:ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.FullOuter
Expand All @@ -32,6 +33,9 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan]

object DefaultOptimizer extends Optimizer {
val batches =
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment at here to let others know that the first step in Optimizer is to remove SubQueries (which are helper wrappers for query analysis)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Batch("Combine Limits", FixedPoint(100),
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, Resolver}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
Expand Down Expand Up @@ -73,12 +73,16 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
* can do better should override this function.
*/
def sameResult(plan: LogicalPlan): Boolean = {
plan.getClass == this.getClass &&
plan.children.size == children.size && {
logDebug(s"[${cleanArgs.mkString(", ")}] == [${plan.cleanArgs.mkString(", ")}]")
cleanArgs == plan.cleanArgs
val cleanLeft = EliminateSubQueries(this)
val cleanRight = EliminateSubQueries(plan)

cleanLeft.getClass == cleanRight.getClass &&
cleanLeft.children.size == cleanRight.children.size && {
logDebug(
s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
cleanRight.cleanArgs == cleanLeft.cleanArgs
} &&
(plan.children, children).zipped.forall(_ sameResult _)
(cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
}

/** Args that have cleaned such that differences in expression id should not affect equality */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseInsensitiveCatalog = new SimpleCatalog(false)

val caseSensitiveAnalyzer =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}
val caseInsensitiveAnalyzer =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}

val checkAnalysis = new CheckAnalysis

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ class DataFrameSuite extends QueryTest {
)
}

test("self join with aliases") {
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}

test("explode") {
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.analyzed
val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
Expand Down Expand Up @@ -109,7 +109,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("multiple-key equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.analyzed
val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row(3) :: Row(4) :: Nil
)

table("test_parquet_ctas").queryExecution.analyzed match {
table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
Seq(Row(1, "str1"))
)

table("test_parquet_ctas").queryExecution.analyzed match {
table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
Expand Down