From 7937d2be2e163736ce90857bf6eb4209001e32e5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 1 Feb 2016 23:35:06 -0800 Subject: [PATCH 01/11] turn on the test. --- .../scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 1f731db26f387..4f5a3a3de7ba2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -97,7 +97,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { // Aggregate operator and aliased to the same name "aggOrder". This is OK for normal query // execution since these aliases have different expression ID. But this introduces name collision // when converting resolved plans back to SQL query strings as expression IDs are stripped. - ignore("aggregate function in order by clause with multiple order keys") { + test("aggregate function in order by clause with multiple order keys") { checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY key, MAX(key)") } From 82bb46fefd69a74f699ff97be5e63a866c318a80 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Feb 2016 06:20:09 -0800 Subject: [PATCH 02/11] added a flag isGenerated to Alias and AttributeReference --- .../sql/catalyst/analysis/Analyzer.scala | 32 +++++++++++-------- .../DistinctAggregationRewriter.scala | 5 +-- .../expressions/codegen/package.scala | 2 +- .../expressions/namedExpressions.scala | 31 +++++++++++------- .../sql/catalyst/planning/patterns.scala | 8 ++--- .../catalyst/plans/logical/LogicalPlan.scala | 2 +- .../analysis/DecimalPrecisionSuite.scala | 2 +- .../encoders/ExpressionEncoderSuite.scala | 2 +- .../sql/hive/LogicalPlanToSQLSuite.scala | 1 - 9 files changed, 50 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a983dc1cdfebe..220587a96bdb3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -413,9 +413,10 @@ class Analyzer( case UnresolvedAlias(f @ UnresolvedFunction(_, args, _), _) if containsStar(args) => val newChildren = expandStarExpressions(args, child) UnresolvedAlias(child = f.copy(children = newChildren)) :: Nil - case Alias(f @ UnresolvedFunction(_, args, _), name) if containsStar(args) => + case Alias(f @ UnresolvedFunction(_, args, _), name, isGenerated) + if containsStar(args) => val newChildren = expandStarExpressions(args, child) - Alias(child = f.copy(children = newChildren), name)() :: Nil + Alias(child = f.copy(children = newChildren), name, isGenerated)() :: Nil case UnresolvedAlias(c @ CreateArray(args), _) if containsStar(args) => val expandedArgs = args.flatMap { case s: Star => s.expand(child, resolver) @@ -490,7 +491,7 @@ class Analyzer( def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = { expressions.map { - case a: Alias => Alias(a.child, a.name)() + case a: Alias => Alias(a.child, a.name, a.isGenerated)() case other => other } } @@ -698,7 +699,10 @@ class Analyzer( // Try resolving the condition of the filter as though it is in the aggregate clause val aggregatedCondition = - Aggregate(grouping, Alias(havingCondition, "havingCondition")() :: Nil, child) + Aggregate( + grouping, + Alias(havingCondition, "havingCondition", isGenerated = true)() :: Nil, + child) val resolvedOperator = execute(aggregatedCondition) def resolvedAggregateFilter = resolvedOperator @@ -723,7 +727,8 @@ class Analyzer( // Try resolving the ordering as though it is in the aggregate clause. try { val unresolvedSortOrders = sortOrder.filter(s => !s.resolved || containsAggregate(s)) - val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) + val aliasedOrdering = + unresolvedSortOrders.map(o => Alias(o.child, "aggOrder", isGenerated = true)()) val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] val resolvedAliasedOrdering: Seq[Alias] = @@ -744,7 +749,7 @@ class Analyzer( val evaluatedOrderings = resolvedAliasedOrdering.zip(sortOrder).map { case (evaluated, order) => val index = originalAggExprs.indexWhere { - case Alias(child, _) => child semanticEquals evaluated.child + case Alias(child, _, _) => child semanticEquals evaluated.child case other => other semanticEquals evaluated.child } @@ -836,12 +841,12 @@ class Analyzer( /** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */ private object AliasedGenerator { def unapply(e: Expression): Option[(Generator, Seq[String])] = e match { - case Alias(g: Generator, name) if g.resolved && g.elementTypes.size > 1 => + case Alias(g: Generator, name, _) if g.resolved && g.elementTypes.size > 1 => // If not given the default names, and the TGF with multiple output columns failAnalysis( s"""Expect multiple names given for ${g.getClass.getName}, |but only single name '${name}' specified""".stripMargin) - case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil)) + case Alias(g: Generator, name, _) if g.resolved => Some((g, name :: Nil)) case MultiAlias(g: Generator, names) if g.resolved => Some(g, names) case _ => None } @@ -1021,7 +1026,7 @@ class Analyzer( // We need to use transformDown because we want to trigger // "case alias @ Alias(window: WindowExpression, _)" first. _.transformDown { - case alias @ Alias(window: WindowExpression, _) => + case alias @ Alias(window: WindowExpression, _, _) => // If a WindowExpression has an assigned alias, just use it. extractedWindowExprBuffer += alias alias.toAttribute @@ -1154,7 +1159,7 @@ class Analyzer( leafNondeterministic.map { e => val ne = e match { case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")() + case _ => Alias(e, "_nondeterministic", isGenerated = true)() } new TreeNodeRef(e) -> ne } @@ -1269,13 +1274,14 @@ object CleanupAliases extends Rule[LogicalPlan] { case c: CreateStructUnsafe if !stop => stop = true c.copy(children = c.children.map(trimNonTopLevelAliases)) - case Alias(child, _) if !stop => child + case Alias(child, _, _) if !stop => child } } def trimNonTopLevelAliases(e: Expression): Expression = e match { case a: Alias => - Alias(trimAliases(a.child), a.name)(a.exprId, a.qualifiers, a.explicitMetadata) + Alias(trimAliases(a.child), a.name, a.isGenerated)( + a.exprId, a.qualifiers, a.explicitMetadata) case other => trimAliases(other) } @@ -1308,7 +1314,7 @@ object CleanupAliases extends Rule[LogicalPlan] { case c: CreateStructUnsafe if !stop => stop = true c.copy(children = c.children.map(trimNonTopLevelAliases)) - case Alias(child, _) if !stop => child + case Alias(child, _, _) if !stop => child } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 4e7d1341028ca..7bac0937c7d2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{IntegerType, Metadata} /** * This rule rewrites an aggregate query with distinct aggregations into an expanded double @@ -126,7 +126,8 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP // Aggregation strategy can handle the query with single distinct if (distinctAggGroups.size > 1) { // Create the attributes for the grouping id and the group by clause. - val gid = new AttributeReference("gid", IntegerType, false)() + val gid = + new AttributeReference("gid", IntegerType, false, Metadata.empty, isGenerated = true)() val groupByMap = a.groupingExpressions.collect { case ne: NamedExpression => ne -> ne.toAttribute case e => e -> new AttributeReference(e.prettyString, e.dataType, e.nullable)() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala index 41128fe389d46..6abc02d8275bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala @@ -33,7 +33,7 @@ package object codegen { object CleanExpressions extends rules.Rule[Expression] { def apply(e: Expression): Expression = e transform { - case Alias(c, _) => c + case Alias(c, _, _) => c } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 7983501ada9bd..60a5a83f84c12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -116,11 +116,15 @@ abstract class Attribute extends LeafExpression with NamedExpression { * * @param child the computation being performed * @param name the name to be associated with the result of computing [[child]]. + * @param isGenerated a flag to indicate if this alias is generated by Catalyst * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this * alias. Auto-assigned if left blank. + * @param qualifiers a list of strings that can be used to referred to this attribute in a fully + * qualified way. Consider the examples tableName.name, subQueryAlias.name. + * tableName and subQueryAlias are possible qualifiers. * @param explicitMetadata Explicit metadata associated with this alias that overwrites child's. */ -case class Alias(child: Expression, name: String)( +case class Alias(child: Expression, name: String, isGenerated: Boolean = false)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil, val explicitMetadata: Option[Metadata] = None) @@ -148,11 +152,12 @@ case class Alias(child: Expression, name: String)( } def newInstance(): NamedExpression = - Alias(child, name)(qualifiers = qualifiers, explicitMetadata = explicitMetadata) + Alias(child, name, isGenerated)(qualifiers = qualifiers, explicitMetadata = explicitMetadata) override def toAttribute: Attribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifiers) + AttributeReference(name, child.dataType, child.nullable, metadata, isGenerated)( + exprId, qualifiers) } else { UnresolvedAttribute(name) } @@ -174,7 +179,8 @@ case class Alias(child: Expression, name: String)( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - s"${child.sql} AS $qualifiersString`$name`" + val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name" + s"${child.sql} AS $qualifiersString`$aliasName`" } } @@ -185,6 +191,7 @@ case class Alias(child: Expression, name: String)( * @param dataType The [[DataType]] of this attribute. * @param nullable True if null is a valid value for this attribute. * @param metadata The metadata of this attribute. + * @param isGenerated a flag to indicate if this reference is generated by Catalyst * @param exprId A globally unique id used to check if different AttributeReferences refer to the * same attribute. * @param qualifiers a list of strings that can be used to referred to this attribute in a fully @@ -195,7 +202,8 @@ case class AttributeReference( name: String, dataType: DataType, nullable: Boolean = true, - override val metadata: Metadata = Metadata.empty)( + override val metadata: Metadata = Metadata.empty, + isGenerated: Boolean = false)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil) extends Attribute with Unevaluable { @@ -234,7 +242,7 @@ case class AttributeReference( } override def newInstance(): AttributeReference = - AttributeReference(name, dataType, nullable, metadata)(qualifiers = qualifiers) + AttributeReference(name, dataType, nullable, metadata, isGenerated)(qualifiers = qualifiers) /** * Returns a copy of this [[AttributeReference]] with changed nullability. @@ -243,7 +251,7 @@ case class AttributeReference( if (nullable == newNullability) { this } else { - AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers) + AttributeReference(name, dataType, newNullability, metadata, isGenerated)(exprId, qualifiers) } } @@ -251,7 +259,7 @@ case class AttributeReference( if (name == newName) { this } else { - AttributeReference(newName, dataType, nullable)(exprId, qualifiers) + AttributeReference(newName, dataType, nullable, metadata, isGenerated)(exprId, qualifiers) } } @@ -262,7 +270,7 @@ case class AttributeReference( if (newQualifiers.toSet == qualifiers.toSet) { this } else { - AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers) + AttributeReference(name, dataType, nullable, metadata, isGenerated)(exprId, newQualifiers) } } @@ -270,7 +278,7 @@ case class AttributeReference( if (exprId == newExprId) { this } else { - AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifiers) + AttributeReference(name, dataType, nullable, metadata, isGenerated)(newExprId, qualifiers) } } @@ -287,7 +295,8 @@ case class AttributeReference( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - s"$qualifiersString`$name`" + val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name" + s"$qualifiersString`$attrRefName`" } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index f0ee124e88a9f..c27b1d60605ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -72,16 +72,16 @@ object PhysicalOperation extends PredicateHelper { } private def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = fields.collect { - case a @ Alias(child, _) => a.toAttribute -> child + case a @ Alias(child, _, _) => a.toAttribute -> child }.toMap private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = { expr.transform { - case a @ Alias(ref: AttributeReference, name) => - aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifiers)).getOrElse(a) + case a @ Alias(ref: AttributeReference, name, isGenerated) => + aliases.get(ref).map(Alias(_, name, isGenerated)(a.exprId, a.qualifiers)).getOrElse(a) case a: AttributeReference => - aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifiers)).getOrElse(a) + aliases.get(a).map(Alias(_, a.name, a.isGenerated)(a.exprId, a.qualifiers)).getOrElse(a) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6d859551f8c52..030da1f01a097 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -139,7 +139,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { case a: Alias => // As the root of the expression, Alias will always take an arbitrary exprId, we need // to erase that for equality testing. - val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers) + val cleanedExprId = Alias(a.child, a.name, a.isGenerated)(ExprId(-1), a.qualifiers) BindReferences.bindReference(cleanedExprId, input, allowFailures = true) case other => BindReferences.bindReference(other, input, allowFailures = true) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index b2613e4909288..44c6c368d01e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -62,7 +62,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { private def checkComparison(expression: Expression, expectedType: DataType): Unit = { val plan = Project(Alias(expression, "c")() :: Nil, relation) val comparison = analyzer.execute(plan).collect { - case Project(Alias(e: BinaryComparison, _) :: Nil, _) => e + case Project(Alias(e: BinaryComparison, _, _) :: Nil, _) => e }.head assert(comparison.left.dataType === expectedType) assert(comparison.right.dataType === expectedType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 88c558d80a79a..127a64348a514 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -327,7 +327,7 @@ class ExpressionEncoderSuite extends SparkFunSuite { val encodedData = try { row.toSeq(encoder.schema).zip(schema).map { - case (a: ArrayData, AttributeReference(_, ArrayType(et, _), _, _)) => + case (a: ArrayData, AttributeReference(_, ArrayType(et, _), _, _, _)) => a.toArray[Any](et).toSeq case (other, _) => other diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 4f5a3a3de7ba2..129bfe0a7dfd8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -92,7 +92,6 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY MAX(key)") } - // TODO Fix name collision introduced by ResolveAggregateFunction analysis rule // When there are multiple aggregate functions in ORDER BY clause, all of them are extracted into // Aggregate operator and aliased to the same name "aggOrder". This is OK for normal query // execution since these aliases have different expression ID. But this introduces name collision From 808dc8a762708300a7eda5b08da933cb46a857dc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Feb 2016 11:42:27 -0800 Subject: [PATCH 03/11] changed isGenerated to the second list in attributeReferences. --- .../DistinctAggregationRewriter.scala | 4 ++-- .../expressions/namedExpressions.scala | 21 ++++++++++--------- .../encoders/ExpressionEncoderSuite.scala | 2 +- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 7bac0937c7d2f..5dfce89bd68a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types.{IntegerType, Metadata} +import org.apache.spark.sql.types.IntegerType /** * This rule rewrites an aggregate query with distinct aggregations into an expanded double @@ -127,7 +127,7 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP if (distinctAggGroups.size > 1) { // Create the attributes for the grouping id and the group by clause. val gid = - new AttributeReference("gid", IntegerType, false, Metadata.empty, isGenerated = true)() + new AttributeReference("gid", IntegerType, false)(isGenerated = true) val groupByMap = a.groupingExpressions.collect { case ne: NamedExpression => ne -> ne.toAttribute case e => e -> new AttributeReference(e.prettyString, e.dataType, e.nullable)() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 60a5a83f84c12..87e06028e9ec5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -156,8 +156,8 @@ case class Alias(child: Expression, name: String, isGenerated: Boolean = false)( override def toAttribute: Attribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable, metadata, isGenerated)( - exprId, qualifiers) + AttributeReference(name, child.dataType, child.nullable, metadata)( + exprId, qualifiers, isGenerated) } else { UnresolvedAttribute(name) } @@ -202,10 +202,10 @@ case class AttributeReference( name: String, dataType: DataType, nullable: Boolean = true, - override val metadata: Metadata = Metadata.empty, - isGenerated: Boolean = false)( + override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, - val qualifiers: Seq[String] = Nil) + val qualifiers: Seq[String] = Nil, + val isGenerated: Boolean = false) extends Attribute with Unevaluable { /** @@ -242,7 +242,8 @@ case class AttributeReference( } override def newInstance(): AttributeReference = - AttributeReference(name, dataType, nullable, metadata, isGenerated)(qualifiers = qualifiers) + AttributeReference(name, dataType, nullable, metadata)( + qualifiers = qualifiers, isGenerated = isGenerated) /** * Returns a copy of this [[AttributeReference]] with changed nullability. @@ -251,7 +252,7 @@ case class AttributeReference( if (nullable == newNullability) { this } else { - AttributeReference(name, dataType, newNullability, metadata, isGenerated)(exprId, qualifiers) + AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers, isGenerated) } } @@ -259,7 +260,7 @@ case class AttributeReference( if (name == newName) { this } else { - AttributeReference(newName, dataType, nullable, metadata, isGenerated)(exprId, qualifiers) + AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifiers, isGenerated) } } @@ -270,7 +271,7 @@ case class AttributeReference( if (newQualifiers.toSet == qualifiers.toSet) { this } else { - AttributeReference(name, dataType, nullable, metadata, isGenerated)(exprId, newQualifiers) + AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers, isGenerated) } } @@ -278,7 +279,7 @@ case class AttributeReference( if (exprId == newExprId) { this } else { - AttributeReference(name, dataType, nullable, metadata, isGenerated)(newExprId, qualifiers) + AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifiers, isGenerated) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 127a64348a514..88c558d80a79a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -327,7 +327,7 @@ class ExpressionEncoderSuite extends SparkFunSuite { val encodedData = try { row.toSeq(encoder.schema).zip(schema).map { - case (a: ArrayData, AttributeReference(_, ArrayType(et, _), _, _, _)) => + case (a: ArrayData, AttributeReference(_, ArrayType(et, _), _, _)) => a.toArray[Any](et).toSeq case (other, _) => other From eae6e9f7eac6e1d2ce25e0d818de47160177fd3f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Feb 2016 12:57:28 -0800 Subject: [PATCH 04/11] changed isGenerated to the second list in Alias. --- .../sql/catalyst/analysis/Analyzer.scala | 29 +++++++++-------- .../DistinctAggregationRewriter.scala | 2 +- .../expressions/codegen/package.scala | 2 +- .../expressions/namedExpressions.scala | 32 +++++++++++-------- .../sql/catalyst/planning/patterns.scala | 11 ++++--- .../catalyst/plans/logical/LogicalPlan.scala | 3 +- .../analysis/DecimalPrecisionSuite.scala | 2 +- 7 files changed, 45 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 220587a96bdb3..4f6c75e0bf2ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -413,10 +413,11 @@ class Analyzer( case UnresolvedAlias(f @ UnresolvedFunction(_, args, _), _) if containsStar(args) => val newChildren = expandStarExpressions(args, child) UnresolvedAlias(child = f.copy(children = newChildren)) :: Nil - case Alias(f @ UnresolvedFunction(_, args, _), name, isGenerated) + case a @ Alias(f @ UnresolvedFunction(_, args, _), name) if containsStar(args) => val newChildren = expandStarExpressions(args, child) - Alias(child = f.copy(children = newChildren), name, isGenerated)() :: Nil + Alias(child = f.copy(children = newChildren), name)( + isGenerated = a.isGenerated) :: Nil case UnresolvedAlias(c @ CreateArray(args), _) if containsStar(args) => val expandedArgs = args.flatMap { case s: Star => s.expand(child, resolver) @@ -491,7 +492,7 @@ class Analyzer( def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = { expressions.map { - case a: Alias => Alias(a.child, a.name, a.isGenerated)() + case a: Alias => Alias(a.child, a.name)(isGenerated = a.isGenerated) case other => other } } @@ -701,7 +702,7 @@ class Analyzer( val aggregatedCondition = Aggregate( grouping, - Alias(havingCondition, "havingCondition", isGenerated = true)() :: Nil, + Alias(havingCondition, "havingCondition")(isGenerated = Some(true)) :: Nil, child) val resolvedOperator = execute(aggregatedCondition) def resolvedAggregateFilter = @@ -728,7 +729,7 @@ class Analyzer( try { val unresolvedSortOrders = sortOrder.filter(s => !s.resolved || containsAggregate(s)) val aliasedOrdering = - unresolvedSortOrders.map(o => Alias(o.child, "aggOrder", isGenerated = true)()) + unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")(isGenerated = Some(true))) val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] val resolvedAliasedOrdering: Seq[Alias] = @@ -749,7 +750,7 @@ class Analyzer( val evaluatedOrderings = resolvedAliasedOrdering.zip(sortOrder).map { case (evaluated, order) => val index = originalAggExprs.indexWhere { - case Alias(child, _, _) => child semanticEquals evaluated.child + case Alias(child, _) => child semanticEquals evaluated.child case other => other semanticEquals evaluated.child } @@ -841,12 +842,12 @@ class Analyzer( /** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */ private object AliasedGenerator { def unapply(e: Expression): Option[(Generator, Seq[String])] = e match { - case Alias(g: Generator, name, _) if g.resolved && g.elementTypes.size > 1 => + case Alias(g: Generator, name) if g.resolved && g.elementTypes.size > 1 => // If not given the default names, and the TGF with multiple output columns failAnalysis( s"""Expect multiple names given for ${g.getClass.getName}, |but only single name '${name}' specified""".stripMargin) - case Alias(g: Generator, name, _) if g.resolved => Some((g, name :: Nil)) + case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil)) case MultiAlias(g: Generator, names) if g.resolved => Some(g, names) case _ => None } @@ -1026,7 +1027,7 @@ class Analyzer( // We need to use transformDown because we want to trigger // "case alias @ Alias(window: WindowExpression, _)" first. _.transformDown { - case alias @ Alias(window: WindowExpression, _, _) => + case alias @ Alias(window: WindowExpression, _) => // If a WindowExpression has an assigned alias, just use it. extractedWindowExprBuffer += alias alias.toAttribute @@ -1159,7 +1160,7 @@ class Analyzer( leafNondeterministic.map { e => val ne = e match { case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic", isGenerated = true)() + case _ => Alias(e, "_nondeterministic")(isGenerated = Some(true)) } new TreeNodeRef(e) -> ne } @@ -1274,14 +1275,14 @@ object CleanupAliases extends Rule[LogicalPlan] { case c: CreateStructUnsafe if !stop => stop = true c.copy(children = c.children.map(trimNonTopLevelAliases)) - case Alias(child, _, _) if !stop => child + case Alias(child, _) if !stop => child } } def trimNonTopLevelAliases(e: Expression): Expression = e match { case a: Alias => - Alias(trimAliases(a.child), a.name, a.isGenerated)( - a.exprId, a.qualifiers, a.explicitMetadata) + Alias(trimAliases(a.child), a.name)( + a.exprId, a.qualifiers, a.explicitMetadata, a.isGenerated) case other => trimAliases(other) } @@ -1314,7 +1315,7 @@ object CleanupAliases extends Rule[LogicalPlan] { case c: CreateStructUnsafe if !stop => stop = true c.copy(children = c.children.map(trimNonTopLevelAliases)) - case Alias(child, _, _) if !stop => child + case Alias(child, _) if !stop => child } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index 5dfce89bd68a6..ac8f78363cfa5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -127,7 +127,7 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP if (distinctAggGroups.size > 1) { // Create the attributes for the grouping id and the group by clause. val gid = - new AttributeReference("gid", IntegerType, false)(isGenerated = true) + new AttributeReference("gid", IntegerType, false)(isGenerated = Some(true)) val groupByMap = a.groupingExpressions.collect { case ne: NamedExpression => ne -> ne.toAttribute case e => e -> new AttributeReference(e.prettyString, e.dataType, e.nullable)() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala index 6abc02d8275bc..41128fe389d46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala @@ -33,7 +33,7 @@ package object codegen { object CleanExpressions extends rules.Rule[Expression] { def apply(e: Expression): Expression = e transform { - case Alias(c, _, _) => c + case Alias(c, _) => c } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 87e06028e9ec5..c3470b3871670 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -114,20 +114,21 @@ abstract class Attribute extends LeafExpression with NamedExpression { * Note that exprId and qualifiers are in a separate parameter list because * we only pattern match on child and name. * - * @param child the computation being performed - * @param name the name to be associated with the result of computing [[child]]. - * @param isGenerated a flag to indicate if this alias is generated by Catalyst + * @param child The computation being performed + * @param name The name to be associated with the result of computing [[child]]. * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this * alias. Auto-assigned if left blank. - * @param qualifiers a list of strings that can be used to referred to this attribute in a fully + * @param qualifiers A list of strings that can be used to referred to this attribute in a fully * qualified way. Consider the examples tableName.name, subQueryAlias.name. * tableName and subQueryAlias are possible qualifiers. * @param explicitMetadata Explicit metadata associated with this alias that overwrites child's. + * @param isGenerated A flag to indicate if this alias is generated by Catalyst */ -case class Alias(child: Expression, name: String, isGenerated: Boolean = false)( +case class Alias(child: Expression, name: String)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil, - val explicitMetadata: Option[Metadata] = None) + val explicitMetadata: Option[Metadata] = None, + val isGenerated: Option[Boolean] = None) extends UnaryExpression with NamedExpression { // Alias(Generator, xx) need to be transformed into Generate(generator, ...) @@ -152,7 +153,8 @@ case class Alias(child: Expression, name: String, isGenerated: Boolean = false)( } def newInstance(): NamedExpression = - Alias(child, name, isGenerated)(qualifiers = qualifiers, explicitMetadata = explicitMetadata) + Alias(child, name)( + qualifiers = qualifiers, explicitMetadata = explicitMetadata, isGenerated = isGenerated) override def toAttribute: Attribute = { if (resolved) { @@ -166,7 +168,7 @@ case class Alias(child: Expression, name: String, isGenerated: Boolean = false)( override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix" override protected final def otherCopyArgs: Seq[AnyRef] = { - exprId :: qualifiers :: explicitMetadata :: Nil + exprId :: qualifiers :: explicitMetadata :: isGenerated :: Nil } override def equals(other: Any): Boolean = other match { @@ -179,7 +181,8 @@ case class Alias(child: Expression, name: String, isGenerated: Boolean = false)( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name" + val aliasName = + if (isGenerated.isDefined && isGenerated.get) s"$name#${exprId.id}" else s"$name" s"${child.sql} AS $qualifiersString`$aliasName`" } } @@ -191,12 +194,12 @@ case class Alias(child: Expression, name: String, isGenerated: Boolean = false)( * @param dataType The [[DataType]] of this attribute. * @param nullable True if null is a valid value for this attribute. * @param metadata The metadata of this attribute. - * @param isGenerated a flag to indicate if this reference is generated by Catalyst * @param exprId A globally unique id used to check if different AttributeReferences refer to the * same attribute. - * @param qualifiers a list of strings that can be used to referred to this attribute in a fully + * @param qualifiers A list of strings that can be used to referred to this attribute in a fully * qualified way. Consider the examples tableName.name, subQueryAlias.name. * tableName and subQueryAlias are possible qualifiers. + * @param isGenerated A flag to indicate if this reference is generated by Catalyst */ case class AttributeReference( name: String, @@ -205,7 +208,7 @@ case class AttributeReference( override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil, - val isGenerated: Boolean = false) + val isGenerated: Option[Boolean] = None) extends Attribute with Unevaluable { /** @@ -284,7 +287,7 @@ case class AttributeReference( } override protected final def otherCopyArgs: Seq[AnyRef] = { - exprId :: qualifiers :: Nil + exprId :: qualifiers :: isGenerated :: Nil } override def toString: String = s"$name#${exprId.id}$typeSuffix" @@ -296,7 +299,8 @@ case class AttributeReference( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name" + val attrRefName = + if (isGenerated.isDefined && isGenerated.get) s"$name#${exprId.id}" else s"$name" s"$qualifiersString`$attrRefName`" } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index c27b1d60605ff..7302b63646d66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -72,16 +72,19 @@ object PhysicalOperation extends PredicateHelper { } private def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = fields.collect { - case a @ Alias(child, _, _) => a.toAttribute -> child + case a @ Alias(child, _) => a.toAttribute -> child }.toMap private def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = { expr.transform { - case a @ Alias(ref: AttributeReference, name, isGenerated) => - aliases.get(ref).map(Alias(_, name, isGenerated)(a.exprId, a.qualifiers)).getOrElse(a) + case a @ Alias(ref: AttributeReference, name) => + aliases.get(ref) + .map(Alias(_, name)(a.exprId, a.qualifiers, isGenerated = a.isGenerated)) + .getOrElse(a) case a: AttributeReference => - aliases.get(a).map(Alias(_, a.name, a.isGenerated)(a.exprId, a.qualifiers)).getOrElse(a) + aliases.get(a) + .map(Alias(_, a.name)(a.exprId, a.qualifiers, isGenerated = a.isGenerated)).getOrElse(a) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 030da1f01a097..d8371744e895e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -139,7 +139,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { case a: Alias => // As the root of the expression, Alias will always take an arbitrary exprId, we need // to erase that for equality testing. - val cleanedExprId = Alias(a.child, a.name, a.isGenerated)(ExprId(-1), a.qualifiers) + val cleanedExprId = + Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated) BindReferences.bindReference(cleanedExprId, input, allowFailures = true) case other => BindReferences.bindReference(other, input, allowFailures = true) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 44c6c368d01e2..b2613e4909288 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -62,7 +62,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { private def checkComparison(expression: Expression, expectedType: DataType): Unit = { val plan = Project(Alias(expression, "c")() :: Nil, relation) val comparison = analyzer.execute(plan).collect { - case Project(Alias(e: BinaryComparison, _, _) :: Nil, _) => e + case Project(Alias(e: BinaryComparison, _) :: Nil, _) => e }.head assert(comparison.left.dataType === expectedType) assert(comparison.right.dataType === expectedType) From 22cf88ac0d30e973189ce66fa348a6e84ae00521 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Feb 2016 14:54:25 -0800 Subject: [PATCH 05/11] internally generated expression names are not resolvable --- .../catalyst/expressions/namedExpressions.scala | 13 +++++++------ .../sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../catalyst/analysis/AnalysisErrorSuite.scala | 11 +++++++++-- .../org/apache/spark/sql/DataFrameSuite.scala | 16 ++++++++++++---- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c3470b3871670..8009bdbbc2266 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -79,6 +79,9 @@ trait NamedExpression extends Expression { /** Returns the metadata when an expression is a reference to another expression with metadata. */ def metadata: Metadata = Metadata.empty + /** Returns true if the expression is generated by Catalyst */ + def isGenerated: Option[Boolean] = None + /** Returns a copy of this expression with a new `exprId`. */ def newInstance(): NamedExpression @@ -128,7 +131,7 @@ case class Alias(child: Expression, name: String)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil, val explicitMetadata: Option[Metadata] = None, - val isGenerated: Option[Boolean] = None) + override val isGenerated: Option[Boolean] = None) extends UnaryExpression with NamedExpression { // Alias(Generator, xx) need to be transformed into Generate(generator, ...) @@ -181,8 +184,7 @@ case class Alias(child: Expression, name: String)( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - val aliasName = - if (isGenerated.isDefined && isGenerated.get) s"$name#${exprId.id}" else s"$name" + val aliasName = if (isGenerated.contains(true)) s"$name#${exprId.id}" else s"$name" s"${child.sql} AS $qualifiersString`$aliasName`" } } @@ -208,7 +210,7 @@ case class AttributeReference( override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil, - val isGenerated: Option[Boolean] = None) + override val isGenerated: Option[Boolean] = None) extends Attribute with Unevaluable { /** @@ -299,8 +301,7 @@ case class AttributeReference( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - val attrRefName = - if (isGenerated.isDefined && isGenerated.get) s"$name#${exprId.id}" else s"$name" + val attrRefName = if (isGenerated.contains(true)) s"$name#${exprId.id}" else s"$name" s"$qualifiersString`$attrRefName`" } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index d8371744e895e..8061c097c7bbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -223,7 +223,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { nameParts: Seq[String], resolver: Resolver, attribute: Attribute): Option[(Attribute, List[String])] = { - if (resolver(attribute.name, nameParts.head)) { + if (!attribute.isGenerated.contains(true) && resolver(attribute.name, nameParts.head)) { Option((attribute.withName(nameParts.head), nameParts.tail.toList)) } else { None diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index fc35959f20547..bc15b3499ed7d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Sum} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} import org.apache.spark.sql.types._ @BeanInfo @@ -176,6 +176,13 @@ class AnalysisErrorSuite extends AnalysisTest { testRelation.select('abcd), "cannot resolve" :: "abcd" :: Nil) + errorTest( + "unresolved attributes with a generated name", + testRelation2.groupBy('a)(max('b)) + .where(sum('b) > 0) + .orderBy('havingCondition.asc), + "cannot resolve" :: "havingCondition" :: Nil) + errorTest( "bad casts", testRelation.select(Literal(1).cast(BinaryType).as('badCast)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c02133ffc8540..3ea4adcaa6424 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -998,12 +998,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-10034: Sort on Aggregate with aggregation expression named 'aggOrdering'") { + test("Alias uses internally generated names 'aggOrder' and 'havingCondition'") { val df = Seq(1 -> 2).toDF("i", "j") - val query = df.groupBy('i) - .agg(max('j).as("aggOrdering")) + val query1 = df.groupBy('i) + .agg(max('j).as("aggOrder")) .orderBy(sum('j)) - checkAnswer(query, Row(1, 2)) + checkAnswer(query1, Row(1, 2)) + + // In the plan, there are two attributes having the same name 'havingCondition' + // One is a user-provided alias name; another is an internally generated one. + val query2 = df.groupBy('i) + .agg(max('j).as("havingCondition")) + .where(sum('j) > 0) + .orderBy('havingCondition.asc) + checkAnswer(query2, Row(1, 2)) } test("SPARK-10316: respect non-deterministic expressions in PhysicalOperation") { From cf8960356f0040d0fdbb302b48fdce8a1ee221ae Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Feb 2016 15:29:12 -0800 Subject: [PATCH 06/11] style fix. --- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index bc15b3499ed7d..e0cec09742eba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -179,8 +179,8 @@ class AnalysisErrorSuite extends AnalysisTest { errorTest( "unresolved attributes with a generated name", testRelation2.groupBy('a)(max('b)) - .where(sum('b) > 0) - .orderBy('havingCondition.asc), + .where(sum('b) > 0) + .orderBy('havingCondition.asc), "cannot resolve" :: "havingCondition" :: Nil) errorTest( From dae231bb524cb4bf800635de9b20236ae8ee5af4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Feb 2016 16:46:06 -0800 Subject: [PATCH 07/11] groupingIdName is marked as 'generated=Some(true)'. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4f6c75e0bf2ab..e38ecdfac1c9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -214,7 +214,9 @@ class Analyzer( case Aggregate(Seq(r @ Rollup(groupByExprs)), aggregateExpressions, child) => GroupingSets(bitmasks(r), groupByExprs, child, aggregateExpressions) case x: GroupingSets => - val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() + val gid = + AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)( + isGenerated = Some(true)) // Expand works by setting grouping expressions to null as determined by the bitmasks. To // prevent these null values from being used in an aggregate instead of the original value From 4c0380922aaacf6cff515b14b7d4a2159c6b7736 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Feb 2016 18:03:08 -0800 Subject: [PATCH 08/11] grouping__id is selectable. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e38ecdfac1c9e..a7c4340971315 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -215,8 +215,7 @@ class Analyzer( GroupingSets(bitmasks(r), groupByExprs, child, aggregateExpressions) case x: GroupingSets => val gid = - AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)( - isGenerated = Some(true)) + AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() // Expand works by setting grouping expressions to null as determined by the bitmasks. To // prevent these null values from being used in an aggregate instead of the original value From fcb022dfe9389fe15873e392e35b37ed9625e11e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 5 Feb 2016 19:23:44 -0800 Subject: [PATCH 09/11] address comments. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++++----- .../analysis/DistinctAggregationRewriter.scala | 2 +- .../sql/catalyst/expressions/namedExpressions.scala | 10 +++++----- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a7c4340971315..b5a97115c2d0c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -414,8 +414,7 @@ class Analyzer( case UnresolvedAlias(f @ UnresolvedFunction(_, args, _), _) if containsStar(args) => val newChildren = expandStarExpressions(args, child) UnresolvedAlias(child = f.copy(children = newChildren)) :: Nil - case a @ Alias(f @ UnresolvedFunction(_, args, _), name) - if containsStar(args) => + case a @ Alias(f @ UnresolvedFunction(_, args, _), name) if containsStar(args) => val newChildren = expandStarExpressions(args, child) Alias(child = f.copy(children = newChildren), name)( isGenerated = a.isGenerated) :: Nil @@ -703,7 +702,7 @@ class Analyzer( val aggregatedCondition = Aggregate( grouping, - Alias(havingCondition, "havingCondition")(isGenerated = Some(true)) :: Nil, + Alias(havingCondition, "havingCondition")(isGenerated = true) :: Nil, child) val resolvedOperator = execute(aggregatedCondition) def resolvedAggregateFilter = @@ -730,7 +729,7 @@ class Analyzer( try { val unresolvedSortOrders = sortOrder.filter(s => !s.resolved || containsAggregate(s)) val aliasedOrdering = - unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")(isGenerated = Some(true))) + unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")(isGenerated = true)) val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate] val resolvedAliasedOrdering: Seq[Alias] = @@ -1161,7 +1160,7 @@ class Analyzer( leafNondeterministic.map { e => val ne = e match { case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = Some(true)) + case _ => Alias(e, "_nondeterministic")(isGenerated = true) } new TreeNodeRef(e) -> ne } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala index ac8f78363cfa5..5dfce89bd68a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala @@ -127,7 +127,7 @@ case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalP if (distinctAggGroups.size > 1) { // Create the attributes for the grouping id and the group by clause. val gid = - new AttributeReference("gid", IntegerType, false)(isGenerated = Some(true)) + new AttributeReference("gid", IntegerType, false)(isGenerated = true) val groupByMap = a.groupingExpressions.collect { case ne: NamedExpression => ne -> ne.toAttribute case e => e -> new AttributeReference(e.prettyString, e.dataType, e.nullable)() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 8009bdbbc2266..207b8a0a88556 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -80,7 +80,7 @@ trait NamedExpression extends Expression { def metadata: Metadata = Metadata.empty /** Returns true if the expression is generated by Catalyst */ - def isGenerated: Option[Boolean] = None + def isGenerated: java.lang.Boolean = false /** Returns a copy of this expression with a new `exprId`. */ def newInstance(): NamedExpression @@ -131,7 +131,7 @@ case class Alias(child: Expression, name: String)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil, val explicitMetadata: Option[Metadata] = None, - override val isGenerated: Option[Boolean] = None) + override val isGenerated: java.lang.Boolean = false) extends UnaryExpression with NamedExpression { // Alias(Generator, xx) need to be transformed into Generate(generator, ...) @@ -184,7 +184,7 @@ case class Alias(child: Expression, name: String)( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - val aliasName = if (isGenerated.contains(true)) s"$name#${exprId.id}" else s"$name" + val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name" s"${child.sql} AS $qualifiersString`$aliasName`" } } @@ -210,7 +210,7 @@ case class AttributeReference( override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil, - override val isGenerated: Option[Boolean] = None) + override val isGenerated: java.lang.Boolean = false) extends Attribute with Unevaluable { /** @@ -301,7 +301,7 @@ case class AttributeReference( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map("`" + _ + "`").mkString("", ".", ".") - val attrRefName = if (isGenerated.contains(true)) s"$name#${exprId.id}" else s"$name" + val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name" s"$qualifiersString`$attrRefName`" } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8061c097c7bbf..3fb6aab05d957 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -223,7 +223,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { nameParts: Seq[String], resolver: Resolver, attribute: Attribute): Option[(Attribute, List[String])] = { - if (!attribute.isGenerated.contains(true) && resolver(attribute.name, nameParts.head)) { + if (!attribute.isGenerated && resolver(attribute.name, nameParts.head)) { Option((attribute.withName(nameParts.head), nameParts.tail.toList)) } else { None From 2b33715a38298711ea12c7cac7bd2de75b4ffd8e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 6 Feb 2016 09:58:02 -0800 Subject: [PATCH 10/11] added a type --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 2df0683f9fa16..30df2a84f62c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -656,6 +656,8 @@ object TreeNode { case t if t <:< definitions.DoubleTpe => value.asInstanceOf[JDouble].num: java.lang.Double + case t if t <:< localTypeOf[java.lang.Boolean] => + value.asInstanceOf[JBool].value: java.lang.Boolean case t if t <:< localTypeOf[BigInt] => value.asInstanceOf[JInt].num case t if t <:< localTypeOf[java.lang.String] => value.asInstanceOf[JString].s case t if t <:< localTypeOf[UUID] => UUID.fromString(value.asInstanceOf[JString].s) From 4e003499d3b6f10f59a816f0f4dd6accf829255d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 6 Feb 2016 10:02:49 -0800 Subject: [PATCH 11/11] style fix --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b5a97115c2d0c..6844f7d9f07a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -214,8 +214,7 @@ class Analyzer( case Aggregate(Seq(r @ Rollup(groupByExprs)), aggregateExpressions, child) => GroupingSets(bitmasks(r), groupByExprs, child, aggregateExpressions) case x: GroupingSets => - val gid = - AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() + val gid = AttributeReference(VirtualColumn.groupingIdName, IntegerType, false)() // Expand works by setting grouping expressions to null as determined by the bitmasks. To // prevent these null values from being used in an aggregate instead of the original value