diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a4e73bb23379e..9a2648a79a576 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -992,7 +992,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor if (metaCols.isEmpty) { node } else { - val newNode = addMetadataCol(node, metaCols.map(_.exprId).toSet) + val newNode = node.mapChildren(addMetadataCol(_, metaCols.map(_.exprId).toSet)) // We should not change the output schema of the plan. We should project away the extra // metadata columns if necessary. if (newNode.sameOutput(node)) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 5c2878be69f1b..52943cbb6cd83 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan -import org.apache.spark.sql.catalyst.plans.{Cross, Inner} +import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, UsingJoin} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util._ @@ -1387,4 +1387,21 @@ class AnalysisSuite extends AnalysisTest with Matchers { assert(!cg.rightOrder.flatMap(_.references).exists(cg.left.output.contains)) } } + + test("SPARK-40149: add metadata column with no extra project") { + val t1 = LocalRelation($"key".int, $"value".string).as("t1") + val t2 = LocalRelation($"key".int, $"value".string).as("t2") + val query = + Project(Seq($"t1.key", $"t2.key"), + Join(t1, t2, UsingJoin(FullOuter, Seq("key")), None, JoinHint.NONE)) + checkAnalysis( + query, + Project(Seq($"t1.key", $"t2.key"), + Project(Seq(coalesce($"t1.key", $"t2.key").as("key"), + $"t1.value", $"t2.value", $"t1.key", $"t2.key"), + Join(t1, t2, FullOuter, Some($"t1.key" === $"t2.key"), JoinHint.NONE) + ) + ).analyze + ) + } }