From f9a164e23b0370f0d7bbcaeaaf4a2a47b1fe9862 Mon Sep 17 00:00:00 2001 From: devvj Date: Sat, 30 Jul 2022 19:57:45 +0200 Subject: [PATCH 1/2] fix and test case --- .../optimizer/NestedColumnAliasing.scala | 17 ++++- .../spark/sql/execution/SparkPlanSuite.scala | 70 +++++++++++++++++-- 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 977e9b1ab1329..b8ab8b78ccbc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -184,9 +184,20 @@ object NestedColumnAliasing { plan: LogicalPlan, nestedFieldToAlias: Map[Expression, Alias], attrToAliases: AttributeMap[Seq[Alias]]): LogicalPlan = { - plan.withNewChildren(plan.children.map { plan => - Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan) - }).transformExpressions { + val newChildPlan = plan match { + case g: Generate => + g.withNewChildren(g.children.map { childPlan => + val origOutput = childPlan.output + val fromAlias = childPlan.output.flatMap(a => attrToAliases.getOrElse(a, Nil)) + Project(origOutput ++ fromAlias, childPlan) + }) + case p => + p.withNewChildren(p.children.map { childPlan => + Project(childPlan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), childPlan) + }) + } + + newChildPlan.transformExpressions { case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) => nestedFieldToAlias(f.canonicalized).toAttribute } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 12d311d6835b7..155816e46516b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -34,13 +34,27 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { test("SPARK-21619 execution of a canonicalized plan should fail") { val plan = spark.range(10).queryExecution.executedPlan.canonicalized - intercept[IllegalStateException] { plan.execute() } - intercept[IllegalStateException] { plan.executeCollect() } - intercept[IllegalStateException] { plan.executeCollectPublic() } - intercept[IllegalStateException] { plan.executeToIterator() } - intercept[IllegalStateException] { plan.executeBroadcast() } - intercept[IllegalStateException] { plan.executeTake(1) } - intercept[IllegalStateException] { plan.executeTail(1) } + intercept[IllegalStateException] { + plan.execute() + } + intercept[IllegalStateException] { + plan.executeCollect() + } + intercept[IllegalStateException] { + plan.executeCollectPublic() + } + intercept[IllegalStateException] { + plan.executeToIterator() + } + intercept[IllegalStateException] { + plan.executeBroadcast() + } + intercept[IllegalStateException] { + plan.executeTake(1) + } + intercept[IllegalStateException] { + plan.executeTail(1) + } } test("SPARK-23731 plans should be canonicalizable after being (de)serialized") { @@ -143,6 +157,48 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-39854: replaceWithAliases should keep the order of Generate children") { + import org.apache.spark.sql.functions.{explode, struct} + import org.apache.spark.sql.SparkSession + val ss: SparkSession = spark + import ss.implicits._ + val testJson = + """{ + | "b": { + | "id": "id00", + | "data": [{ + | "b1": "vb1", + | "b2": 101, + | "ex2": [ + | { "fb1": false, "fb2": 11, "fb3": "t1" }, + | { "fb1": true, "fb2": 12, "fb3": "t2" } + | ]}, { + | "b1": "vb2", + | "b2": 102, + | "ex2": [ + | { "fb1": false, "fb2": 13, "fb3": "t3" }, + | { "fb1": true, "fb2": 14, "fb3": "t4" } + | ]} + | ], + | "fa": "tes", + | "v": "1.5" + | } + |} + |""".stripMargin + val df = spark.read.json((testJson :: Nil).toDS()) + .withColumn("ex_b", explode($"b.data.ex2")) + .withColumn("ex_b2", explode($"ex_b")) + val df1 = df + .withColumn("rt", struct( + $"b.fa".alias("rt_fa"), + $"b.v".alias("rt_v") + )) + .drop("b", "ex_b") + + val result = df1.collect() + assert(result.length == 4) + } } case class ColumnarOp(child: SparkPlan) extends UnaryExecNode { From 3b37325e4b4ac8897f94ddb1fc782078127c1b11 Mon Sep 17 00:00:00 2001 From: devvj Date: Sat, 30 Jul 2022 20:16:17 +0200 Subject: [PATCH 2/2] revert unintended changes --- .../spark/sql/execution/SparkPlanSuite.scala | 28 +++++-------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 155816e46516b..d91da0488cdbc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -34,27 +34,13 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { test("SPARK-21619 execution of a canonicalized plan should fail") { val plan = spark.range(10).queryExecution.executedPlan.canonicalized - intercept[IllegalStateException] { - plan.execute() - } - intercept[IllegalStateException] { - plan.executeCollect() - } - intercept[IllegalStateException] { - plan.executeCollectPublic() - } - intercept[IllegalStateException] { - plan.executeToIterator() - } - intercept[IllegalStateException] { - plan.executeBroadcast() - } - intercept[IllegalStateException] { - plan.executeTake(1) - } - intercept[IllegalStateException] { - plan.executeTail(1) - } + intercept[IllegalStateException] { plan.execute() } + intercept[IllegalStateException] { plan.executeCollect() } + intercept[IllegalStateException] { plan.executeCollectPublic() } + intercept[IllegalStateException] { plan.executeToIterator() } + intercept[IllegalStateException] { plan.executeBroadcast() } + intercept[IllegalStateException] { plan.executeTake(1) } + intercept[IllegalStateException] { plan.executeTail(1) } } test("SPARK-23731 plans should be canonicalizable after being (de)serialized") {