From cca3ae69663ee85470bfe7e05d18b6e702e7c6ce Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 2 Mar 2016 15:57:57 -0800 Subject: [PATCH 1/4] Reorder conditions in join and filters --- .../sql/catalyst/planning/QueryPlanner.scala | 24 +++++++++++- .../spark/sql/execution/SparkStrategies.scala | 37 ++++++++++++------- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 56a3dd02f9ba3..fd82da7ed80f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.planning import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper} +import org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode @@ -26,8 +28,28 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * be used for execution. If this strategy does not apply to the give logical operation then an * empty list should be returned. */ -abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { +abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] + extends PredicateHelper with Logging { + def apply(plan: LogicalPlan): Seq[PhysicalPlan] + + // Attempts to re-order the individual conjunctive predicates in an expression to short circuit + // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others. + protected def getReorderedExpression(expr: Expression): Expression = { + splitConjunctivePredicates(expr) + .sortWith((x, _) => x.isInstanceOf[IsNotNull]) + .reduce(And) + } + + // Wrapper around getReorderedExpression(expr: Expression) to reorder optional conditions in joins + protected def getReorderedExpression(exprOpt: Option[Expression]): Option[Expression] = { + exprOpt match { + case Some(expr) => + Option(getReorderedExpression(expr)) + case None => + exprOpt + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index debd04aa95b9c..ea2bf666bee48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -66,11 +66,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys( LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => joins.BroadcastLeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), + getReorderedExpression(condition)) :: Nil // Find left semi joins where at least some predicates can be evaluated by matching join keys case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => joins.LeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), + getReorderedExpression(condition)) :: Nil case _ => Nil } } @@ -111,33 +113,39 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, Inner, BuildRight, getReorderedExpression(condition), + planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, Inner, BuildLeft, getReorderedExpression(condition), planLater(left), + planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeJoin( - leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil + leftKeys, rightKeys, getReorderedExpression(condition), planLater(left), + planLater(right)) :: Nil // --- Outer joins -------------------------------------------------------------------------- case ExtractEquiJoinKeys( LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, LeftOuter, BuildRight, getReorderedExpression(condition), + planLater(left), planLater(right))) case ExtractEquiJoinKeys( RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right))) + leftKeys, rightKeys, RightOuter, BuildLeft, getReorderedExpression(condition), + planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeOuterJoin( - leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil + leftKeys, rightKeys, joinType, getReorderedExpression(condition), planLater(left), + planLater(right)) :: Nil // --- Cases where this strategy does not apply --------------------------------------------- @@ -252,10 +260,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) => execution.joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil + planLater(left), planLater(right), joins.BuildLeft, j.joinType, + getReorderedExpression(condition)) :: Nil case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) => execution.joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil + planLater(left), planLater(right), joins.BuildRight, j.joinType, + getReorderedExpression(condition)) :: Nil case _ => Nil } } @@ -265,7 +275,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Join(left, right, Inner, None) => execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil case logical.Join(left, right, Inner, Some(condition)) => - execution.Filter(condition, + execution.Filter(getReorderedExpression(condition), execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil case _ => Nil } @@ -282,7 +292,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } // This join could be very slow or even hang forever joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil + planLater(left), planLater(right), buildSide, joinType, + getReorderedExpression(condition)) :: Nil case _ => Nil } } @@ -341,7 +352,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Project(projectList, child) => execution.Project(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => - execution.Filter(condition, planLater(child)) :: Nil + execution.Filter(getReorderedExpression(condition), planLater(child)) :: Nil case e @ logical.Expand(_, _, child) => execution.Expand(e.projections, e.output, planLater(child)) :: Nil case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => From 74413860d0f847b8756f52aa6e1697129bacf37c Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 2 Mar 2016 17:20:18 -0800 Subject: [PATCH 2/4] unit tests: ReorderedPredicateSuite --- .../execution/ReorderedPredicateSuite.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala new file mode 100644 index 0000000000000..f68b8ac3f6ef2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper} +import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.test.SharedSQLContext + + +class ReorderedPredicateSuite extends QueryTest with SharedSQLContext with PredicateHelper { + + setupTestData() + + // Verifies that the IsNotNull operators precede rest of the operators + private def verifyOrder(condition: Expression): Unit = { + splitConjunctivePredicates(condition).sliding(2).foreach { case Seq(x, y) => + assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull]) + } + } + + test("null ordering in filter predicates") { + val physicalPlan = sql( + """ + |SELECT * from testData + |WHERE value != '5' AND value IS NOT NULL + """.stripMargin) + .queryExecution.sparkPlan + assert(physicalPlan.find(_.isInstanceOf[Filter]).isDefined) + physicalPlan.collect { + case Filter(condition, _) => + verifyOrder(condition) + } + } + + test("null ordering in join predicates") { + sqlContext.cacheManager.clearCache() + val physicalPlan = sql( + """ + |SELECT * FROM testData t1 + |LEFT SEMI JOIN testData t2 + |ON t1.key = t2.key + |AND t1.key + t2.key != 5 + |AND CONCAT(t1.value, t2.value) IS NOT NULL + """.stripMargin) + .queryExecution.sparkPlan + assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined) + physicalPlan.collect { + case LeftSemiJoinHash(_, _, _, _, conditionOpt) => + verifyOrder(conditionOpt.get) + } + } +} From d3c4739e44686727480aeb1103febcff315e3af1 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 4 Mar 2016 13:50:34 -0800 Subject: [PATCH 3/4] Nong's comment --- .../sql/catalyst/planning/QueryPlanner.scala | 8 +++--- .../spark/sql/execution/SparkStrategies.scala | 26 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index fd82da7ed80f2..1e4523e2d8cf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -35,17 +35,17 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] // Attempts to re-order the individual conjunctive predicates in an expression to short circuit // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others. - protected def getReorderedExpression(expr: Expression): Expression = { + protected def reorderPredicates(expr: Expression): Expression = { splitConjunctivePredicates(expr) .sortWith((x, _) => x.isInstanceOf[IsNotNull]) .reduce(And) } - // Wrapper around getReorderedExpression(expr: Expression) to reorder optional conditions in joins - protected def getReorderedExpression(exprOpt: Option[Expression]): Option[Expression] = { + // Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins + protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = { exprOpt match { case Some(expr) => - Option(getReorderedExpression(expr)) + Option(reorderPredicates(expr)) case None => exprOpt } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ea2bf666bee48..36fea4d20360a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -67,12 +67,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => joins.BroadcastLeftSemiJoinHash( leftKeys, rightKeys, planLater(left), planLater(right), - getReorderedExpression(condition)) :: Nil + reorderPredicates(condition)) :: Nil // Find left semi joins where at least some predicates can be evaluated by matching join keys case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => joins.LeftSemiJoinHash( leftKeys, rightKeys, planLater(left), planLater(right), - getReorderedExpression(condition)) :: Nil + reorderPredicates(condition)) :: Nil case _ => Nil } } @@ -113,18 +113,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildRight, getReorderedExpression(condition), + leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition), planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildLeft, getReorderedExpression(condition), planLater(left), + leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeJoin( - leftKeys, rightKeys, getReorderedExpression(condition), planLater(left), + leftKeys, rightKeys, reorderPredicates(condition), planLater(left), planLater(right)) :: Nil // --- Outer joins -------------------------------------------------------------------------- @@ -132,19 +132,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys( LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, LeftOuter, BuildRight, getReorderedExpression(condition), + leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition), planLater(left), planLater(right))) case ExtractEquiJoinKeys( RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, RightOuter, BuildLeft, getReorderedExpression(condition), + leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition), planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeOuterJoin( - leftKeys, rightKeys, joinType, getReorderedExpression(condition), planLater(left), + leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left), planLater(right)) :: Nil // --- Cases where this strategy does not apply --------------------------------------------- @@ -261,11 +261,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) => execution.joins.BroadcastNestedLoopJoin( planLater(left), planLater(right), joins.BuildLeft, j.joinType, - getReorderedExpression(condition)) :: Nil + reorderPredicates(condition)) :: Nil case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) => execution.joins.BroadcastNestedLoopJoin( planLater(left), planLater(right), joins.BuildRight, j.joinType, - getReorderedExpression(condition)) :: Nil + reorderPredicates(condition)) :: Nil case _ => Nil } } @@ -275,7 +275,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Join(left, right, Inner, None) => execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil case logical.Join(left, right, Inner, Some(condition)) => - execution.Filter(getReorderedExpression(condition), + execution.Filter(reorderPredicates(condition), execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil case _ => Nil } @@ -293,7 +293,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // This join could be very slow or even hang forever joins.BroadcastNestedLoopJoin( planLater(left), planLater(right), buildSide, joinType, - getReorderedExpression(condition)) :: Nil + reorderPredicates(condition)) :: Nil case _ => Nil } } @@ -352,7 +352,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Project(projectList, child) => execution.Project(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => - execution.Filter(getReorderedExpression(condition), planLater(child)) :: Nil + execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil case e @ logical.Expand(_, _, child) => execution.Expand(e.projections, e.output, planLater(child)) :: Nil case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => From 21d889706052bb4fc3655b9150b00900f88a7388 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 8 Mar 2016 13:44:41 -0800 Subject: [PATCH 4/4] Yin's comments --- .../execution/ReorderedPredicateSuite.scala | 71 ++++++++++++++----- 1 file changed, 53 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala index f68b8ac3f6ef2..dd0e43866b80d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala @@ -17,40 +17,63 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper} -import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.execution +import org.apache.spark.sql.execution.joins.LeftSemiJoinHash import org.apache.spark.sql.test.SharedSQLContext -class ReorderedPredicateSuite extends QueryTest with SharedSQLContext with PredicateHelper { +class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper { setupTestData() - // Verifies that the IsNotNull operators precede rest of the operators - private def verifyOrder(condition: Expression): Unit = { - splitConjunctivePredicates(condition).sliding(2).foreach { case Seq(x, y) => + // Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators + // and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained + private def verifyStableOrder(before: Expression, after: Expression): Unit = { + val oldPredicates = splitConjunctivePredicates(before) + splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) => + // Verify IsNotNull operator ordering assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull]) + + // Verify stable sort order + if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) || + (!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) { + assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y)) + } } } test("null ordering in filter predicates") { - val physicalPlan = sql( + val query = sql( """ |SELECT * from testData - |WHERE value != '5' AND value IS NOT NULL + |WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5 """.stripMargin) - .queryExecution.sparkPlan - assert(physicalPlan.find(_.isInstanceOf[Filter]).isDefined) - physicalPlan.collect { + .queryExecution + + val logicalPlan = query.optimizedPlan + val physicalPlan = query.sparkPlan + assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined) + assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined) + + val logicalCondition = logicalPlan.collect { + case logical.Filter(condition, _) => + condition + }.head + + val physicalCondition = physicalPlan.collect { case Filter(condition, _) => - verifyOrder(condition) - } + condition + }.head + + verifyStableOrder(logicalCondition, physicalCondition) } test("null ordering in join predicates") { sqlContext.cacheManager.clearCache() - val physicalPlan = sql( + val query = sql( """ |SELECT * FROM testData t1 |LEFT SEMI JOIN testData t2 @@ -58,11 +81,23 @@ class ReorderedPredicateSuite extends QueryTest with SharedSQLContext with Predi |AND t1.key + t2.key != 5 |AND CONCAT(t1.value, t2.value) IS NOT NULL """.stripMargin) - .queryExecution.sparkPlan + .queryExecution + + val logicalPlan = query.optimizedPlan + val physicalPlan = query.sparkPlan + assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined) assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined) - physicalPlan.collect { + + val logicalCondition = logicalPlan.collect { + case Join(_, _, _, condition) => + condition.get + }.head + + val physicalCondition = physicalPlan.collect { case LeftSemiJoinHash(_, _, _, _, conditionOpt) => - verifyOrder(conditionOpt.get) - } + conditionOpt.get + }.head + + verifyStableOrder(logicalCondition, physicalCondition) } }