diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 977e9b1ab1329..b8ab8b78ccbc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -184,9 +184,20 @@ object NestedColumnAliasing { plan: LogicalPlan, nestedFieldToAlias: Map[Expression, Alias], attrToAliases: AttributeMap[Seq[Alias]]): LogicalPlan = { - plan.withNewChildren(plan.children.map { plan => - Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan) - }).transformExpressions { + val newChildPlan = plan match { + case g: Generate => + g.withNewChildren(g.children.map { childPlan => + val origOutput = childPlan.output + val fromAlias = childPlan.output.flatMap(a => attrToAliases.getOrElse(a, Nil)) + Project(origOutput ++ fromAlias, childPlan) + }) + case p => + p.withNewChildren(p.children.map { childPlan => + Project(childPlan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), childPlan) + }) + } + + newChildPlan.transformExpressions { case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) => nestedFieldToAlias(f.canonicalized).toAttribute } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 12d311d6835b7..d91da0488cdbc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -143,6 +143,48 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-39854: replaceWithAliases should keep the order of Generate children") { + import org.apache.spark.sql.functions.{explode, struct} + import org.apache.spark.sql.SparkSession + val ss: SparkSession = spark + import ss.implicits._ + val testJson = + """{ + | "b": { + | "id": "id00", + | "data": [{ + | "b1": "vb1", + | "b2": 101, + | "ex2": [ + | { "fb1": false, "fb2": 11, "fb3": "t1" }, + | { "fb1": true, "fb2": 12, "fb3": "t2" } + | ]}, { + | "b1": "vb2", + | "b2": 102, + | "ex2": [ + | { "fb1": false, "fb2": 13, "fb3": "t3" }, + | { "fb1": true, "fb2": 14, "fb3": "t4" } + | ]} + | ], + | "fa": "tes", + | "v": "1.5" + | } + |} + |""".stripMargin + val df = spark.read.json((testJson :: Nil).toDS()) + .withColumn("ex_b", explode($"b.data.ex2")) + .withColumn("ex_b2", explode($"ex_b")) + val df1 = df + .withColumn("rt", struct( + $"b.fa".alias("rt_fa"), + $"b.v".alias("rt_v") + )) + .drop("b", "ex_b") + + val result = df1.collect() + assert(result.length == 4) + } } case class ColumnarOp(child: SparkPlan) extends UnaryExecNode {