From 5221be35710ed149238cd801754790e7442f954b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 7 Mar 2021 12:44:13 -0800 Subject: [PATCH 1/9] Nested column prune on generator output for one field. --- .../optimizer/NestedColumnAliasing.scala | 84 ++++++++++++++++++- .../optimizer/NestedColumnAliasingSuite.scala | 28 ++++++- 2 files changed, 106 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 0be2792bfd7db..f0879b529f310 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -231,6 +231,27 @@ object NestedColumnAliasing { * of it. */ object GeneratorNestedColumnAliasing { + // Partitions `attrToAliases` based on whether the attribute is in Generator's output. + private def aliasesOnGeneratorOutput( + attrToAliases: Map[ExprId, Seq[Alias]], + generatorOutput: Seq[Attribute]) = { + val generatorOutputExprId = generatorOutput.map(_.exprId) + attrToAliases.partition { k => + generatorOutputExprId.contains(k._1) + } + } + + // Partitions `nestedFieldToAlias` based on whether the attribute of nested field extractor + // is in Generator's output. + private def nestedFieldOnGeneratorOutput( + nestedFieldToAlias: Map[ExtractValue, Alias], + generatorOutput: Seq[Attribute]) = { + val generatorOutputSet = AttributeSet(generatorOutput) + nestedFieldToAlias.partition { pair => + pair._1.references.subsetOf(generatorOutputSet) + } + } + def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { // Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is enabled, we // need to prune nested columns through Project and under Generate. The difference is @@ -241,12 +262,67 @@ object GeneratorNestedColumnAliasing { // On top on `Generate`, a `Project` that might have nested column accessors. // We try to get alias maps for both project list and generator's children expressions. val exprsToPrune = projectList ++ g.generator.children - NestedColumnAliasing.getAliasSubMap(exprsToPrune, g.qualifiedGeneratorOutput).map { + NestedColumnAliasing.getAliasSubMap(exprsToPrune).map { case (nestedFieldToAlias, attrToAliases) => // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. - val newChild = - NestedColumnAliasing.replaceWithAliases(g, nestedFieldToAlias, attrToAliases) - Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild) + + val (nestedFieldsOnGenerator, nestedFieldsNotOnGenerator) = + nestedFieldOnGeneratorOutput(nestedFieldToAlias, g.qualifiedGeneratorOutput) + val (attrToAliasesOnGenerator, attrToAliasesNotOnGenerator) = + aliasesOnGeneratorOutput(attrToAliases, g.qualifiedGeneratorOutput) + + // Push nested column accessors through `Generator`. We cannot prune on `Generator`'s + // output. + val newChild = NestedColumnAliasing.replaceWithAliases(g, + nestedFieldsNotOnGenerator, attrToAliasesNotOnGenerator) + val pushedThrough = Project(NestedColumnAliasing + .getNewProjectList(projectList, nestedFieldsNotOnGenerator), newChild) + + // Pruning on `Generator`'s output. We only process single field case. + // For multiple field case, we cannot directly move field extractor into + // the generator expression. A workaround is to re-construct array of struct + // from multiple fields. But it will be more complicated and may not worth. + if (nestedFieldsOnGenerator.size == 1) { + // Only one nested column accessor. + // E.g., df.select(explode($"items").as("item")).select($"item.a") + pushedThrough match { + case p @ Project(_, newG: Generate) => + // Replace the child expression of `ExplodeBase` generator with + // nested column accessor. + // E.g., df.select(explode($"items").as("item")) => + // df.select(explode($"items.a").as("item")) + val rewrittenG = newG.transformExpressions { + case e: ExplodeBase => + val extractor = nestedFieldsOnGenerator.head._1.transform { + case g: GetStructField => + ExtractValue(e.child, Literal(g.extractFieldName), SQLConf.get.resolver) + } + e.withNewChildren(Seq(extractor)) + } + + // As we change the child of the generator, its output data type must be updated. + val updatedGeneratorOutput = rewrittenG.generatorOutput + .zip(rewrittenG.generator.elementSchema.toAttributes) + .map { case (oldAttr, newAttr) => + newAttr.withExprId(oldAttr.exprId).withName(oldAttr.name) + } + assert(updatedGeneratorOutput.length == rewrittenG.generatorOutput.length, + "Updated generator output must have same length as original generator output.") + val updatedGenerate = rewrittenG.copy(generatorOutput = updatedGeneratorOutput) + + // Replace nested column accessor with generator output. + p.withNewChildren(Seq(updatedGenerate)).transformExpressions { + case f: ExtractValue if nestedFieldsOnGenerator.contains(f) => + updatedGenerate.output + .find(a => attrToAliasesOnGenerator.contains(a.exprId)) + .getOrElse(f) + } + + case _ => pushedThrough + } + } else { + pushedThrough + } } case g: Generate if SQLConf.get.nestedSchemaPruningEnabled && diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala index c83ab375ee15a..23e775ffb3ed8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala @@ -336,7 +336,7 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { val query = companies .generate(Explode('employers.getField("company")), outputNames = Seq("company")) - .select('company.getField("name")) + .select('company.getField("name"), 'company.getField("address")) .analyze val optimized = Optimize.execute(query) @@ -347,7 +347,8 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { .generate(Explode($"${aliases(0)}"), unrequiredChildIndex = Seq(0), outputNames = Seq("company")) - .select('company.getField("name").as("company.name")) + .select('company.getField("name").as("company.name"), + 'company.getField("address").as("company.address")) .analyze comparePlans(optimized, expected) } @@ -684,6 +685,29 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { ).analyze comparePlans(optimized2, expected2) } + + test("SPARK-34638: nested column prune on generator output for one field") { + val companies = LocalRelation( + 'id.int, + 'employers.array(employer)) + + val query = companies + .generate(Explode('employers.getField("company")), outputNames = Seq("company")) + .select('company.getField("name")) + .analyze + val optimized = Optimize.execute(query) + + val aliases = collectGeneratedAliases(optimized) + + val expected = companies + .select('employers.getField("company").getField("name").as(aliases(0))) + .generate(Explode($"${aliases(0)}"), + unrequiredChildIndex = Seq(0), + outputNames = Seq("company")) + .select('company.as("company.name")) + .analyze + comparePlans(optimized, expected) + } } object NestedColumnAliasingSuite { From 4758f96e788bf7c38f81bdf0603eb4493422764d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 25 Mar 2021 23:57:47 -0700 Subject: [PATCH 2/9] Add e2e test cases. --- .../datasources/SchemaPruningSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index c90732183cb7a..fa7194dbdeeb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -351,6 +351,27 @@ abstract class SchemaPruningSuite } } + testSchemaPruning("SPARK-34638: nested column prune on generator output") { + val query1 = spark.table("contacts") + .select(explode(col("friends")).as("friend")) + .select("friend.first") + checkScan(query1, "struct>>") + checkAnswer(query1, Row("Susan") :: Nil) + + // Currently we don't prune multiple field case. + val query2 = spark.table("contacts") + .select(explode(col("friends")).as("friend")) + .select("friend.first", "friend.middle") + checkScan(query2, "struct>>") + checkAnswer(query2, Row("Susan", "Z.") :: Nil) + + val query3 = spark.table("contacts") + .select(explode(col("friends")).as("friend")) + .select("friend.first", "friend.middle", "friend") + checkScan(query3, "struct>>") + checkAnswer(query3, Row("Susan", "Z.", Row("Susan", "Z.", "Smith")) :: Nil) + } + testSchemaPruning("select one deep nested complex field after repartition") { val query = sql("select * from contacts") .repartition(100) From 7908788447625229e4f6d71ab728f1a549857089 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 26 Mar 2021 00:40:43 -0700 Subject: [PATCH 3/9] Fix wrong transform logic. --- .../spark/sql/catalyst/optimizer/NestedColumnAliasing.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index f0879b529f310..5e82d9cb0c972 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -293,9 +293,11 @@ object GeneratorNestedColumnAliasing { // df.select(explode($"items.a").as("item")) val rewrittenG = newG.transformExpressions { case e: ExplodeBase => - val extractor = nestedFieldsOnGenerator.head._1.transform { + val extractor = nestedFieldsOnGenerator.head._1.transformUp { + case _: Attribute => + e.child case g: GetStructField => - ExtractValue(e.child, Literal(g.extractFieldName), SQLConf.get.resolver) + ExtractValue(g.child, Literal(g.extractFieldName), SQLConf.get.resolver) } e.withNewChildren(Seq(extractor)) } From fe286df4a856bd1fcf162794c543e84d965dce2e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 5 Apr 2021 00:30:02 -0700 Subject: [PATCH 4/9] Address comments. --- .../optimizer/NestedColumnAliasing.scala | 17 ++++++++--------- .../optimizer/NestedColumnAliasingSuite.scala | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 5e82d9cb0c972..72704a3b5efc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -264,15 +264,13 @@ object GeneratorNestedColumnAliasing { val exprsToPrune = projectList ++ g.generator.children NestedColumnAliasing.getAliasSubMap(exprsToPrune).map { case (nestedFieldToAlias, attrToAliases) => - // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. - val (nestedFieldsOnGenerator, nestedFieldsNotOnGenerator) = nestedFieldOnGeneratorOutput(nestedFieldToAlias, g.qualifiedGeneratorOutput) val (attrToAliasesOnGenerator, attrToAliasesNotOnGenerator) = aliasesOnGeneratorOutput(attrToAliases, g.qualifiedGeneratorOutput) - // Push nested column accessors through `Generator`. We cannot prune on `Generator`'s - // output. + // Push nested column accessors through `Generator`. + // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. val newChild = NestedColumnAliasing.replaceWithAliases(g, nestedFieldsNotOnGenerator, attrToAliasesNotOnGenerator) val pushedThrough = Project(NestedColumnAliasing @@ -282,15 +280,18 @@ object GeneratorNestedColumnAliasing { // For multiple field case, we cannot directly move field extractor into // the generator expression. A workaround is to re-construct array of struct // from multiple fields. But it will be more complicated and may not worth. - if (nestedFieldsOnGenerator.size == 1) { + // TODO(SPARK-34956): support multiple fields. + if (nestedFieldsOnGenerator.size > 1 || nestedFieldsOnGenerator.size == 0) { + pushedThrough + } else { // Only one nested column accessor. // E.g., df.select(explode($"items").as("item")).select($"item.a") pushedThrough match { case p @ Project(_, newG: Generate) => // Replace the child expression of `ExplodeBase` generator with // nested column accessor. - // E.g., df.select(explode($"items").as("item")) => - // df.select(explode($"items.a").as("item")) + // E.g., df.select(explode($"items").as("item")).select($"item.a") => + // df.select(explode($"items.a").as("item.a")) val rewrittenG = newG.transformExpressions { case e: ExplodeBase => val extractor = nestedFieldsOnGenerator.head._1.transformUp { @@ -322,8 +323,6 @@ object GeneratorNestedColumnAliasing { case _ => pushedThrough } - } else { - pushedThrough } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala index 23e775ffb3ed8..6d046b32b2987 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala @@ -329,7 +329,7 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { comparePlans(optimized, expected) } - test("Nested field pruning for Project and Generate: not prune on generator output") { + test("Nested field pruning for Project and Generate: multiple-field case is not supported") { val companies = LocalRelation( 'id.int, 'employers.array(employer)) From df5c44d69d04fd1a6b45ae4673f0d8a51e9b78ff Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 10 Apr 2021 13:52:47 -0700 Subject: [PATCH 5/9] Add case-insensitive test. --- .../execution/datasources/SchemaPruningSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index 7bb40e571f164..e127f3b48c2a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -370,6 +370,20 @@ abstract class SchemaPruningSuite .select("friend.first", "friend.middle", "friend") checkScan(query3, "struct>>") checkAnswer(query3, Row("Susan", "Z.", Row("Susan", "Z.", "Smith")) :: Nil) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val query4 = spark.table("contacts") + .select(explode(col("friends")).as("friend")) + .select("friend.First") + checkScan(query4, "struct>>") + checkAnswer(query4, Row("Susan") :: Nil) + + val query5 = spark.table("contacts") + .select(explode(col("friends")).as("friend")) + .select("friend.MIDDLE") + checkScan(query5, "struct>>") + checkAnswer(query5, Row("Z.") :: Nil) + } } testSchemaPruning("select one deep nested complex field after repartition") { From 6c9d839a53095cdd24abe2007a1576a7c80a8f93 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 10 Apr 2021 13:59:01 -0700 Subject: [PATCH 6/9] Add comment. --- .../spark/sql/catalyst/expressions/ProjectionOverSchema.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala index 03b5517f6df05..a6be98c8a3aae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala @@ -42,7 +42,8 @@ case class ProjectionOverSchema(schema: StructType) { getProjection(a.child).map(p => (p, p.dataType)).map { case (projection, ArrayType(projSchema @ StructType(_), _)) => // For case-sensitivity aware field resolution, we should take `ordinal` which - // points to correct struct field. + // points to correct struct field, because `ExtractValue` actually does column + // name resolving correctly. val selectedField = a.child.dataType.asInstanceOf[ArrayType] .elementType.asInstanceOf[StructType](a.ordinal) val prunedField = projSchema(selectedField.name) From ad3d19145e984a001205920ffd8dfde1f83ea4d1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 15 Apr 2021 00:48:42 -0700 Subject: [PATCH 7/9] Address review comments. --- .../catalyst/optimizer/NestedColumnAliasing.scala | 6 ++++-- .../execution/datasources/SchemaPruningSuite.scala | 14 ++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 72704a3b5efc2..c63f53b4a4902 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -281,7 +281,7 @@ object GeneratorNestedColumnAliasing { // the generator expression. A workaround is to re-construct array of struct // from multiple fields. But it will be more complicated and may not worth. // TODO(SPARK-34956): support multiple fields. - if (nestedFieldsOnGenerator.size > 1 || nestedFieldsOnGenerator.size == 0) { + if (nestedFieldsOnGenerator.size > 1 || nestedFieldsOnGenerator.isEmpty) { pushedThrough } else { // Only one nested column accessor. @@ -321,7 +321,9 @@ object GeneratorNestedColumnAliasing { .getOrElse(f) } - case _ => pushedThrough + case other => + // We should not reach here. + throw new IllegalStateException(s"Unreasonable plan after optimization: $other") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index e127f3b48c2a7..a06204fe725d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -370,19 +370,21 @@ abstract class SchemaPruningSuite .select("friend.first", "friend.middle", "friend") checkScan(query3, "struct>>") checkAnswer(query3, Row("Susan", "Z.", Row("Susan", "Z.", "Smith")) :: Nil) + } + testSchemaPruning("SPARK-34638: nested column prune on generator output - case-sensitivity") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - val query4 = spark.table("contacts") + val query1 = spark.table("contacts") .select(explode(col("friends")).as("friend")) .select("friend.First") - checkScan(query4, "struct>>") - checkAnswer(query4, Row("Susan") :: Nil) + checkScan(query1, "struct>>") + checkAnswer(query1, Row("Susan") :: Nil) - val query5 = spark.table("contacts") + val query2 = spark.table("contacts") .select(explode(col("friends")).as("friend")) .select("friend.MIDDLE") - checkScan(query5, "struct>>") - checkAnswer(query5, Row("Z.") :: Nil) + checkScan(query2, "struct>>") + checkAnswer(query2, Row("Z.") :: Nil) } } From 8d4309af2a561d616c4f8d80095eb55f2e0abfe7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 22 Apr 2021 21:55:19 -0700 Subject: [PATCH 8/9] Deal with special cases. --- .../optimizer/NestedColumnAliasing.scala | 10 ++++++++++ .../datasources/SchemaPruningSuite.scala | 17 +++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index c63f53b4a4902..beca98d13d6c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -276,6 +276,16 @@ object GeneratorNestedColumnAliasing { val pushedThrough = Project(NestedColumnAliasing .getNewProjectList(projectList, nestedFieldsNotOnGenerator), newChild) + // If the generator output is `ArrayType`, we cannot push through the extractor. + // It is because we don't allow field extractor on two-level array, + // i.e., attr.field when attr is a ArrayType(ArrayType(...)). + // Similarily, we also cannot push through if the child of generator is `MapType`. + g.generator.children.head.dataType match { + case _: MapType => return Some(pushedThrough) + case ArrayType(_: ArrayType, _) => return Some(pushedThrough) + case _ => + } + // Pruning on `Generator`'s output. We only process single field case. // For multiple field case, we cannot directly move field extractor into // the generator expression. A workaround is to re-construct array of struct diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala index a06204fe725d4..ac5c28953a5d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -853,4 +853,21 @@ abstract class SchemaPruningSuite Row("John", "Y.") :: Nil) } } + + test("SPARK-34638: queries should not fail on unsupported cases") { + withTable("nested_array") { + sql("select * from values array(array(named_struct('a', 1, 'b', 3), " + + "named_struct('a', 2, 'b', 4))) T(items)").write.saveAsTable("nested_array") + val query = sql("select d.a from (select explode(c) d from " + + "(select explode(items) c from nested_array))") + checkAnswer(query, Row(1) :: Row(2) :: Nil) + } + + withTable("map") { + sql("select * from values map(1, named_struct('a', 1, 'b', 3), " + + "2, named_struct('a', 2, 'b', 4)) T(items)").write.saveAsTable("map") + val query = sql("select d.a from (select explode(items) (c, d) from map)") + checkAnswer(query, Row(1) :: Row(2) :: Nil) + } + } } From a7194092ebc3a9ca97daba29fda0fcc0ec099cf4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 24 Apr 2021 17:48:44 -0700 Subject: [PATCH 9/9] Address some comments. --- .../sql/catalyst/optimizer/NestedColumnAliasing.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index beca98d13d6c2..5b12667f4a884 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -315,12 +315,13 @@ object GeneratorNestedColumnAliasing { // As we change the child of the generator, its output data type must be updated. val updatedGeneratorOutput = rewrittenG.generatorOutput - .zip(rewrittenG.generator.elementSchema.toAttributes) - .map { case (oldAttr, newAttr) => + .zip(rewrittenG.generator.elementSchema.toAttributes) + .map { case (oldAttr, newAttr) => newAttr.withExprId(oldAttr.exprId).withName(oldAttr.name) } assert(updatedGeneratorOutput.length == rewrittenG.generatorOutput.length, - "Updated generator output must have same length as original generator output.") + "Updated generator output must have the same length " + + "with original generator output.") val updatedGenerate = rewrittenG.copy(generatorOutput = updatedGeneratorOutput) // Replace nested column accessor with generator output.