From b4e514ade7ea478055db448bbf66f7a88caf3a86 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 3 Feb 2017 07:08:47 +0000 Subject: [PATCH 1/3] Improve the code to generate constraints. --- .../catalyst/plans/logical/LogicalPlan.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 93550e1fc32ab..380409c27e9f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -314,19 +314,19 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var allConstraints = child.constraints.asInstanceOf[Set[Expression]] - projectList.foreach { - case a @ Alias(e, _) => - // For every alias in `projectList`, replace the reference in constraints by its attribute. - allConstraints ++= allConstraints.map(_ transform { - case expr: Expression if expr.semanticEquals(e) => - a.toAttribute - }) - allConstraints += EqualNullSafe(e, a.toAttribute) - case _ => // Don't change. + var additionalConstraints = Set.empty[Expression] + val attrs = projectList.collect { + case a @ Alias(e, _) => (e, a.toAttribute) } - allConstraints -- child.constraints + // For every alias in `projectList`, replace the reference in constraints by its attribute. + child.constraints.asInstanceOf[Set[Expression]].map(_ transform { + case expr: Expression => + attrs.find(attrPair => attrPair._1.semanticEquals(expr)).map { attrPair => + additionalConstraints += EqualNullSafe(expr, attrPair._2) + attrPair._2 + }.getOrElse(expr) + }) ++ additionalConstraints -- child.constraints } override protected def validConstraints: Set[Expression] = child.constraints From 8c98a5c3ab1477408988c8cb682733e65dd554fc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 9 Feb 2017 04:07:09 +0000 Subject: [PATCH 2/3] Use parallel collection to improve the function. --- .../catalyst/plans/logical/LogicalPlan.scala | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 380409c27e9f9..fae34075d58f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.catalyst.plans.logical +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool + import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.CatalystConf @@ -301,6 +304,11 @@ abstract class LeafNode extends LogicalPlan { override def producedAttributes: AttributeSet = outputSet } +object UnaryNode { + private[spark] lazy val taskSupport = + new ForkJoinTaskSupport(new ForkJoinPool(8)) +} + /** * A logical plan node with single child. */ @@ -314,19 +322,29 @@ abstract class UnaryNode extends LogicalPlan { * expressions with the corresponding alias */ protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var additionalConstraints = Set.empty[Expression] - val attrs = projectList.collect { - case a @ Alias(e, _) => (e, a.toAttribute) - } - - // For every alias in `projectList`, replace the reference in constraints by its attribute. - child.constraints.asInstanceOf[Set[Expression]].map(_ transform { - case expr: Expression => - attrs.find(attrPair => attrPair._1.semanticEquals(expr)).map { attrPair => - additionalConstraints += EqualNullSafe(expr, attrPair._2) - attrPair._2 - }.getOrElse(expr) - }) ++ additionalConstraints -- child.constraints + val relativeReferences = AttributeSet(projectList.collect { + case a: Alias => a + }.flatMap(_.references)) + val parAllConstraints = child.constraints.asInstanceOf[Set[Expression]].filter { constraint => + constraint.references.intersect(relativeReferences).nonEmpty + }.par + parAllConstraints.tasksupport = UnaryNode.taskSupport + + parAllConstraints.flatMap { constraint => + var partConstraints = Set(constraint) + projectList.foreach { + case a @ Alias(e, _) => + // For every alias in `projectList`, replace the reference in constraints + // by its attribute. + partConstraints ++= partConstraints.map(_ transform { + case expr: Expression if expr.semanticEquals(e) => + a.toAttribute + }) + partConstraints += EqualNullSafe(e, a.toAttribute) + case _ => // Don't change. + } + partConstraints + }.seq -- child.constraints } override protected def validConstraints: Set[Expression] = child.constraints From 278c31cf8aa27c71e0f5178bebcb426ec5fba6ce Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 17 Feb 2017 03:23:39 +0000 Subject: [PATCH 3/3] Revert parallel collection approach. Reduce aliased constraints. --- .../catalyst/plans/logical/LogicalPlan.scala | 50 ++++++++----------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 67bb2cff986d6..1ad4b565b0c50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.catalyst.plans.logical -import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool - import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.CatalystConf @@ -304,11 +301,6 @@ abstract class LeafNode extends LogicalPlan { override def producedAttributes: AttributeSet = outputSet } -object UnaryNode { - private[spark] lazy val taskSupport = - new ForkJoinTaskSupport(new ForkJoinPool(8)) -} - /** * A logical plan node with single child. */ @@ -324,27 +316,27 @@ abstract class UnaryNode extends LogicalPlan { protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { val relativeReferences = AttributeSet(projectList.collect { case a: Alias => a - }.flatMap(_.references)) - val parAllConstraints = child.constraints.asInstanceOf[Set[Expression]].filter { constraint => - constraint.references.intersect(relativeReferences).nonEmpty - }.par - parAllConstraints.tasksupport = UnaryNode.taskSupport - - parAllConstraints.flatMap { constraint => - var partConstraints = Set(constraint) - projectList.foreach { - case a @ Alias(e, _) => - // For every alias in `projectList`, replace the reference in constraints - // by its attribute. - partConstraints ++= partConstraints.map(_ transform { - case expr: Expression if expr.semanticEquals(e) => - a.toAttribute - }) - partConstraints += EqualNullSafe(e, a.toAttribute) - case _ => // Don't change. - } - partConstraints - }.seq -- child.constraints + }.flatMap(_.references)) ++ outputSet + + // We only care about the constraints which refer to attributes in output and aliases. + // For example, for a constraint 'a > b', if 'a' is aliased to 'c', we need to get aliased + // constraint 'c > b' only if 'b' is in output. + var allConstraints = child.constraints.filter { constraint => + constraint.references.subsetOf(relativeReferences) + }.asInstanceOf[Set[Expression]] + + projectList.foreach { + case a @ Alias(e, _) => + // For every alias in `projectList`, replace the reference in constraints by its attribute. + allConstraints ++= allConstraints.map(_ transform { + case expr: Expression if expr.semanticEquals(e) => + a.toAttribute + }) + allConstraints += EqualNullSafe(e, a.toAttribute) + case _ => // Don't change. + } + + allConstraints -- child.constraints } override protected def validConstraints: Set[Expression] = child.constraints