From 84633711baff0a13afc1831add254e5ca6ba6338 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 20 Apr 2026 16:50:28 +0200 Subject: [PATCH 01/10] [SPARK-56550][SQL] Support filling missing nested struct fields in INSERT INTO WITH SCHEMA EVOLUTION Add support for INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested struct fields with null (or column defaults) when the source has fewer fields than the target, mirroring existing MERGE INTO behavior. Changes: - Add spark.sql.insertNestedTypeCoercion.enabled config flag (default false) - Refactor TableOutputResolver.resolveOutputColumns to accept DefaultValueFillMode enum directly instead of two overlapping boolean parameters - Enable RECURSE mode for V2 inserts when both schema evolution and the config flag are active - Add comprehensive tests for all scenarios --- .../sql/catalyst/analysis/Analyzer.scala | 6 +- .../analysis/TableOutputResolver.scala | 61 +- .../apache/spark/sql/internal/SQLConf.scala | 13 + .../sql/execution/datasources/rules.scala | 2 +- .../spark/sql/connector/InsertIntoTests.scala | 545 ++++++++++++++++++ 5 files changed, 606 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 09c6a0984258c..0a5ed3439f85f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3788,9 +3788,13 @@ class Analyzer( validateStoreAssignmentPolicy() TableOutputResolver.suitableForByNameCheck(v2Write.isByName, expected = v2Write.table.output, queryOutput = v2Write.query.output) + import TableOutputResolver.DefaultValueFillMode._ + val defaultValueFillMode = + if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) RECURSE + else FILL val projection = TableOutputResolver.resolveOutputColumns( v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf, - supportColDefaultValue = true) + defaultValueFillMode) if (projection != v2Write.query) { val cleanedTable = v2Write.table match { case r: DataSourceV2Relation => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 7eacc5ab9b2ad..308339570f9d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -40,11 +40,18 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegralTyp object TableOutputResolver extends SQLConfHelper with Logging { /** - * Modes for filling in default or null values for missing columns. - * If FILL, fill missing top-level columns with their default values. - * If RECURSE, fill missing top-level columns and also recurse into nested struct - * fields to fill null. - * If NONE, do not fill any missing columns. + * Controls how missing columns or nested struct fields are handled during output resolution. + * + * NONE: Do not fill any missing columns. Mismatches in column count or nested struct + * fields cause errors. Used for V1 CTAS. + * FILL: Fill missing top-level columns with their default values (or null if no default + * is defined). Nested struct field mismatches still cause errors. Used for V1 inserts + * and V2 inserts without nested type coercion. + * RECURSE: Fill missing top-level columns with defaults and also recurse into nested + * structs, arrays, and maps to fill missing fields with null. Additionally relaxes + * the by-position arity check to allow fewer source columns than target columns. + * Used for V2 inserts with schema evolution and nested type coercion enabled. + * Gated by `spark.sql.insertNestedTypeCoercion.enabled` (default false). */ object DefaultValueFillMode extends Enumeration { val FILL, RECURSE, NONE = Value @@ -92,19 +99,22 @@ object TableOutputResolver extends SQLConfHelper with Logging { query: LogicalPlan, byName: Boolean, conf: SQLConf, - supportColDefaultValue: Boolean = false): LogicalPlan = { + defaultValueFillMode: DefaultValueFillMode.Value = NONE): LogicalPlan = { if (expected.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( tableName, expected.map(_.name), query.output) } + // In RECURSE mode, allow fewer source columns than target by filling trailing columns + // with defaults. In other modes, a column count mismatch in by-position resolution is + // an error. + val fillDefaultValue = defaultValueFillMode == RECURSE val errors = new mutable.ArrayBuffer[String]() val resolved: Seq[NamedExpression] = if (byName) { - // If a top-level column does not have a corresponding value in the input query, fill with - // the column's default value. We need to pass `fillDefaultValue` as FILL here, if the - // `supportColDefaultValue` parameter is also true. - val defaultValueFillMode = if (supportColDefaultValue) FILL else NONE + // By-name resolution: the defaultValueFillMode is passed through to control whether + // missing top-level columns are filled (FILL/RECURSE) and whether missing nested + // struct fields are also filled (RECURSE only). reorderColumnsByName( tableName, query.output, @@ -114,11 +124,12 @@ object TableOutputResolver extends SQLConfHelper with Logging { Nil, defaultValueFillMode) } else { - if (expected.size > query.output.size) { + if (expected.size > query.output.size && !fillDefaultValue) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( tableName, expected.map(_.name), query.output) } - resolveColumnsByPosition(tableName, query.output, expected, conf, errors += _) + resolveColumnsByPosition( + tableName, query.output, expected, conf, errors += _, fillDefaultValue = fillDefaultValue) } if (errors.nonEmpty) { @@ -377,7 +388,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { expectedCols: Seq[Attribute], conf: SQLConf, addError: String => Unit, - colPath: Seq[String] = Nil): Seq[NamedExpression] = { + colPath: Seq[String] = Nil, + fillDefaultValue: Boolean = false): Seq[NamedExpression] = { val actualExpectedCols = expectedCols.map { attr => attr.withDataType { CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) } } @@ -393,7 +405,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { tableName, colPath.quoted, extraColsStr ) } - } else if (inputCols.size < actualExpectedCols.size) { + } else if (inputCols.size < actualExpectedCols.size && !fillDefaultValue) { val missingColsStr = actualExpectedCols.takeRight(actualExpectedCols.size - inputCols.size) .map(col => toSQLId(col.name)) .mkString(", ") @@ -407,25 +419,36 @@ object TableOutputResolver extends SQLConfHelper with Logging { } } - inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) => + val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) => val newColPath = colPath :+ expectedCol.name (inputCol.dataType, expectedCol.dataType) match { case (inputType: StructType, expectedType: StructType) => resolveStructType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue = false) + byName = false, conf, addError, newColPath, fillDefaultValue) case (inputType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue = false) + byName = false, conf, addError, newColPath, fillDefaultValue) case (inputType: MapType, expectedType: MapType) => resolveMapType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue = false) + byName = false, conf, addError, newColPath, fillDefaultValue) case _ => checkField(tableName, expectedCol, inputCol, byName = false, conf, addError, newColPath) } } + + val defaults = if (fillDefaultValue) { + actualExpectedCols.drop(inputCols.size).flatMap { expectedCol => + getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) + .map(expr => Alias(expr, expectedCol.name)()) + } + } else { + Nil + } + + matched ++ defaults } private[sql] def checkNullability( @@ -468,7 +491,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { defaultValueMode) } else { resolveColumnsByPosition( - tableName, fields, toAttributes(expectedType), conf, addError, colPath) + tableName, fields, toAttributes(expectedType), conf, addError, colPath, fillDefaultValue) } if (resolved.length == expectedType.length) { val struct = CreateStruct(resolved) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fcb736e404854..4178dd7f47152 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -7270,6 +7270,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED = + buildConf("spark.sql.insertNestedTypeCoercion.enabled") + .internal() + .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + + "struct fields with null when the source has fewer nested fields than the target " + + "table. This is experimental and the semantics may change.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val TIME_TYPE_ENABLED = buildConf("spark.sql.timeType.enabled") .internal() @@ -8597,6 +8607,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def coerceMergeNestedTypes: Boolean = getConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED) + def coerceInsertNestedTypes: Boolean = + getConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED) + def isTimeTypeEnabled: Boolean = getConf(SQLConf.TIME_TYPE_ENABLED) def listaggAllowDistinctCastWithOrder: Boolean = getConf(LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index ce41bbe4aeb3d..7122dd52ef1a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -528,7 +528,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase { query, byName, conf, - supportColDefaultValue = true) + TableOutputResolver.DefaultValueFillMode.FILL) } catch { case e: AnalysisException if staticPartCols.nonEmpty && (e.getCondition == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" || diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 4f023136a6fe1..1b6add613e5c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -574,6 +574,26 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => byName: Boolean = false, replaceWhere: Option[String] = None): Unit + /** Insert data into a table by name without schema evolution. */ + protected def doInsertByName( + tableName: String, + insert: DataFrame, + mode: SaveMode = SaveMode.Append): Unit = { + val tmpView = "tmp_view" + withTempView(tmpView) { + insert.createOrReplaceTempView(tmpView) + val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO" + sql(s"INSERT $overwrite TABLE $tableName BY NAME SELECT * FROM $tmpView") + } + } + + /** Run a block with INSERT nested type coercion enabled. */ + protected def withInsertNestedTypeCoercion(f: => Unit): Unit = { + withSQLConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED.key -> "true") { + f + } + } + test("Insert schema evolution: extra column - no auto-schema-evolution capability") { val t1 = s"${catalogAndNamespace}tbl" withTable(t1) { @@ -1403,4 +1423,529 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => assert(spark.table(t1).schema("id").dataType === IntegerType) } } + + // --------------------------------------------------------------------------- + // Tests for source with fewer columns/fields than target (mirroring MERGE) + // --------------------------------------------------------------------------- + + test("Insert schema evolution: source missing top-level column by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val schema = StructType(Seq( + StructField("id", IntegerType), + StructField("salary", IntegerType), + StructField("dep", StringType))) + val data = Seq(Row(0, 100, "sales")) + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data), schema)) + doInsertWithSchemaEvolution(t1, + Seq((1, "engineering")).toDF("id", "dep"), + byName = true) + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, null, "engineering"))) + } + } + + test("Insert schema evolution: source missing top-level column by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + // By position: source col 1 maps to target col 1, source col 2 maps to target col 2, + // trailing target col 3 is filled with null. + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, + Seq((1, 200)).toDF("id", "salary")) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, 200, null))) + } + } + + test("Insert schema evolution: source missing top-level column with DEFAULT by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + doInsertWithSchemaEvolution(t1, + Seq((1, "engineering")).toDF("id", "dep"), + byName = true) + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, 200, "engineering"))) + } + } + + test("Insert schema evolution: source missing top-level column with DEFAULT by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT 'unknown') USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, + Seq((1, 200)).toDF("id", "salary")) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, 200, "unknown"))) + } + } + + test("Insert schema evolution: source missing nested struct field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)))) + } + } + + test("Insert schema evolution: source missing nested struct field by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)))) + } + } + + test("Insert schema evolution: source missing field in struct nested in array by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"a array>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null))))) + } + } + + test("Insert schema evolution: source missing deeply nested struct field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", BooleanType))))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType))))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null))))) + } + } + + test("Insert schema evolution: source with null struct and missing nested field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", IntegerType)))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, null))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", 10)), Row(1, null))) + } + } + + test("Insert schema evolution: source with null struct and missing nested field by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", IntegerType)))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, null))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", 10)), Row(1, null))) + } + } + + test("Insert schema evolution: mixed null and non-null structs with missing field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2, null))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2, null))) + } + } + + test("Insert schema evolution: null deeply nested struct with missing field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", BooleanType))))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType))))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null)))) + } + } + + test("Insert schema evolution: null struct in array with missing field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"a array>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null)))) + } + } + + test("Insert schema evolution: source missing field in struct nested in map value by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"m map>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq( + Row(0, Map("x" -> Row(1, true))), + Row(1, Map("y" -> Row(10, null))))) + } + } + + // --------------------------------------------------------------------------- + // Negative tests: missing columns/fields should fail WITHOUT schema evolution + // --------------------------------------------------------------------------- + + test("Insert without evolution: source missing top-level column by name fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + checkError( + exception = intercept[AnalysisException] { + doInsert(t1, Seq((1, "engineering")).toDF("id", "dep")) + }, + condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "tableColumns" -> "`id`, `salary`, `dep`", + "dataColumns" -> "`id`, `dep`") + ) + } + } + + test("Insert without evolution: source missing top-level column by position fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + checkError( + exception = intercept[AnalysisException] { + doInsert(t1, Seq((1, 200)).toDF("id", "salary")) + }, + condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "tableColumns" -> "`id`, `salary`, `dep`", + "dataColumns" -> "`id`, `salary`") + ) + } + } + + test("Insert without evolution: source missing nested struct field by name fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + val ex = intercept[AnalysisException] { + doInsertByName(t1, sourceData) + } + assert(ex.getMessage.contains("Cannot find data")) + } + } + + test("Insert with evolution but without coercion flag:" + + " source missing nested struct field by name fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + val ex = intercept[AnalysisException] { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + assert(ex.getMessage.contains("Cannot find data")) + } + } + + test("Insert without evolution: source missing nested struct field by position fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + checkError( + exception = intercept[AnalysisException] { + doInsert(t1, sourceData) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`", + "missingFields" -> "`c3`") + ) + } + } } From b5174c022caa8acb9288153db59cdc626a273e1e Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 20 Apr 2026 16:55:20 +0200 Subject: [PATCH 02/10] Move DefaultValueFillMode import to top of file --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0a5ed3439f85f..d9dec891154be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -29,6 +29,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.internal.config.ConfigBindingPolicy import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ +import org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode._ import org.apache.spark.sql.catalyst.analysis.resolver.{ AnalyzerBridgeState, HybridAnalyzer, @@ -3788,7 +3789,6 @@ class Analyzer( validateStoreAssignmentPolicy() TableOutputResolver.suitableForByNameCheck(v2Write.isByName, expected = v2Write.table.output, queryOutput = v2Write.query.output) - import TableOutputResolver.DefaultValueFillMode._ val defaultValueFillMode = if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) RECURSE else FILL From 8cc43e23ba035c18b4c270f746257bb2f86a0664 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 20 Apr 2026 18:45:09 +0200 Subject: [PATCH 03/10] Cleanup --- .../catalyst/analysis/TableOutputResolver.scala | 17 +++++------------ .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/connector/InsertIntoTests.scala | 2 +- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 308339570f9d9..32140ffc89509 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -40,18 +40,11 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegralTyp object TableOutputResolver extends SQLConfHelper with Logging { /** - * Controls how missing columns or nested struct fields are handled during output resolution. - * - * NONE: Do not fill any missing columns. Mismatches in column count or nested struct - * fields cause errors. Used for V1 CTAS. - * FILL: Fill missing top-level columns with their default values (or null if no default - * is defined). Nested struct field mismatches still cause errors. Used for V1 inserts - * and V2 inserts without nested type coercion. - * RECURSE: Fill missing top-level columns with defaults and also recurse into nested - * structs, arrays, and maps to fill missing fields with null. Additionally relaxes - * the by-position arity check to allow fewer source columns than target columns. - * Used for V2 inserts with schema evolution and nested type coercion enabled. - * Gated by `spark.sql.insertNestedTypeCoercion.enabled` (default false). + * Modes for filling in default or null values for missing columns. + * If FILL, fill missing top-level columns with their default values. + * If RECURSE, fill missing top-level columns and also recurse into nested struct + * fields to fill null. + * If NONE, do not fill any missing columns. */ object DefaultValueFillMode extends Enumeration { val FILL, RECURSE, NONE = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4178dd7f47152..58f711480e162 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -7276,7 +7276,7 @@ object SQLConf { .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + "struct fields with null when the source has fewer nested fields than the target " + "table. This is experimental and the semantics may change.") - .version("4.1.0") + .version("4.2.0") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 1b6add613e5c7..8129982cb8856 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -1425,7 +1425,7 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => } // --------------------------------------------------------------------------- - // Tests for source with fewer columns/fields than target (mirroring MERGE) + // Tests for source with fewer columns/fields than target // --------------------------------------------------------------------------- test("Insert schema evolution: source missing top-level column by name") { From 4aff22f6bc112cd309c9a0dd2889d50819901b8b Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 21 Apr 2026 14:06:38 +0200 Subject: [PATCH 04/10] Address review comments: add clarifying comment and extra+missing column tests --- .../sql/catalyst/analysis/Analyzer.scala | 4 ++ .../spark/sql/connector/InsertIntoTests.scala | 53 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d9dec891154be..2d383a1d37d46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3789,6 +3789,10 @@ class Analyzer( validateStoreAssignmentPolicy() TableOutputResolver.suitableForByNameCheck(v2Write.isByName, expected = v2Write.table.output, queryOutput = v2Write.query.output) + // With schema evolution, allow the source to have fewer columns/fields than the target + // and fill missing ones with default values or nulls (RECURSE mode). Without schema + // evolution, only top-level default column values are filled (FILL mode) and any + // missing columns will cause a schema enforcement error. val defaultValueFillMode = if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) RECURSE else FILL diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 8129982cb8856..5e3e65be8df2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -1818,6 +1818,59 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => } } + test("Insert schema evolution: extra and missing top-level column by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + // Source has "active" (extra) but is missing "salary". Column count is the same (3) + // but names differ; by-name resolution should add "active" via schema evolution + // and fill "salary" with null. + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, + Seq((1, "engineering", true)).toDF("id", "dep", "active"), + byName = true) + } + checkAnswer( + sql(s"SELECT id, salary, dep, active FROM $t1"), + Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true))) + } + } + + test("Insert schema evolution: extra and missing nested struct field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field count is the same + // (3) but names differ; by-name resolution should add "c4" via schema evolution and fill + // "c3" with null. + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c4", DoubleType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))), sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"), + Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14))) + } + } + // --------------------------------------------------------------------------- // Negative tests: missing columns/fields should fail WITHOUT schema evolution // --------------------------------------------------------------------------- From 77f74eb446e3ea9b5240c758a6eb6a79df24e61a Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 22 Apr 2026 15:58:16 +0200 Subject: [PATCH 05/10] Add insertNestedTypeCoercion.enabled to binding policy exceptions --- .../configs-without-binding-policy-exceptions | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index 2aa6cb885ca31..36fda2b506886 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -659,6 +659,7 @@ spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio spark.sql.inMemoryColumnarStorage.hugeVectorThreshold spark.sql.inMemoryColumnarStorage.partitionPruning spark.sql.inMemoryTableScanStatistics.enable +spark.sql.insertNestedTypeCoercion.enabled spark.sql.join.preferSortMergeJoin spark.sql.json.enableExactStringParsing spark.sql.json.enablePartialResults From 15291696748823ad796807cc942381caf970ecf4 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Sat, 25 Apr 2026 08:47:18 +0200 Subject: [PATCH 06/10] [SPARK-56550][SQL] Address cloud-fan review for INSERT schema evolution coercion Propagate fillDefaultValue through resolveArrayType and resolveMapType by-position paths; use applyColumnMetadata for trailing default fills; clarify Analyzer and SQLConf docs; extend DefaultValueFillMode scaladoc; fix by-name negative test (with USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES disabled) and add by-position array/map nested struct tests. --- .../sql/catalyst/analysis/Analyzer.scala | 8 +- .../analysis/TableOutputResolver.scala | 17 ++-- .../apache/spark/sql/internal/SQLConf.scala | 4 +- .../spark/sql/connector/InsertIntoTests.scala | 89 ++++++++++++++++--- 4 files changed, 96 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2d383a1d37d46..faa78e0306364 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3789,10 +3789,10 @@ class Analyzer( validateStoreAssignmentPolicy() TableOutputResolver.suitableForByNameCheck(v2Write.isByName, expected = v2Write.table.output, queryOutput = v2Write.query.output) - // With schema evolution, allow the source to have fewer columns/fields than the target - // and fill missing ones with default values or nulls (RECURSE mode). Without schema - // evolution, only top-level default column values are filled (FILL mode) and any - // missing columns will cause a schema enforcement error. + // With schema evolution + coercion flag, missing top-level columns AND missing nested + // struct fields are filled with defaults/null (RECURSE mode). Otherwise, only missing + // top-level columns are filled via FILL mode; missing nested struct fields still cause + // schema enforcement errors. val defaultValueFillMode = if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) RECURSE else FILL diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 32140ffc89509..3887d0ec7b24c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -41,9 +41,10 @@ object TableOutputResolver extends SQLConfHelper with Logging { /** * Modes for filling in default or null values for missing columns. - * If FILL, fill missing top-level columns with their default values. - * If RECURSE, fill missing top-level columns and also recurse into nested struct - * fields to fill null. + * If FILL, fill missing top-level columns with their default values (by-name reorder path). + * If RECURSE, fill missing top-level columns (including trailing columns on the by-position + * path for INSERT with schema evolution when enabled) and recurse into nested structs, + * arrays, and maps to fill missing struct fields with null or defaults. * If NONE, do not fill any missing columns. */ object DefaultValueFillMode extends Enumeration { @@ -435,7 +436,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaults = if (fillDefaultValue) { actualExpectedCols.drop(inputCols.size).flatMap { expectedCol => getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) - .map(expr => Alias(expr, expectedCol.name)()) + .map(expr => applyColumnMetadata(expr, expectedCol)) } } else { Nil @@ -519,7 +520,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, defaultValueMode) } else { - resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath) + resolveColumnsByPosition( + tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, fillDefaultValue) } if (res.length == 1) { val castedArray = @@ -556,7 +558,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, defaultValueFillMode) } else { - resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath) + resolveColumnsByPosition( + tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, fillDefaultValue) } val valueParam = @@ -568,7 +571,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { defaultValueFillMode) } else { resolveColumnsByPosition( - tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath) + tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, fillDefaultValue) } if (resKey.length == 1 && resValue.length == 1) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 58f711480e162..29a734f055985 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -7275,7 +7275,9 @@ object SQLConf { .internal() .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + "struct fields with null when the source has fewer nested fields than the target " + - "table. This is experimental and the semantics may change.") + "table. Also relaxes by-position column-count enforcement so trailing missing " + + "top-level columns are filled with their default value (or null). This is " + + "experimental and the semantics may change.") .version("4.2.0") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 5e3e65be8df2a..dc711739718fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -1586,6 +1586,39 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => } } + test("Insert schema evolution: source missing field in struct nested in array by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"a array>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null))))) + } + } + test("Insert schema evolution: source missing deeply nested struct field by name") { val t1 = s"${catalogAndNamespace}tbl" withTable(t1) { @@ -1818,6 +1851,39 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => } } + test("Insert schema evolution: source missing field in struct nested in map value by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"m map>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq( + Row(0, Map("x" -> Row(1, true))), + Row(1, Map("y" -> Row(10, null))))) + } + } + test("Insert schema evolution: extra and missing top-level column by name") { val t1 = s"${catalogAndNamespace}tbl" withTable(t1) { @@ -1880,16 +1946,19 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => withTable(t1) { sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) - checkError( - exception = intercept[AnalysisException] { - doInsert(t1, Seq((1, "engineering")).toDF("id", "dep")) - }, - condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", - parameters = Map( - "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), - "tableColumns" -> "`id`, `salary`, `dep`", - "dataColumns" -> "`id`, `dep`") - ) + // Without explicit DEFAULT on `salary`, missing by-name data only errors when null-fill + // for missing defaults is disabled; otherwise FILL mode inserts null for `salary`. + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep")) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`salary`") + ) + } } } From 1e183f56417310a7aac961046c5aeedf89644952 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 27 Apr 2026 07:21:09 +0200 Subject: [PATCH 07/10] [SPARK-56550][SQL] Fail INSERT schema evolution when trailing fill has no default When insert nested coercion + schema evolution fills by-position trailing columns/fields, getDefaultValueExprOrNullLit can return None (e.g. nullable column with useNullsForMissingDefaultColumnValues=false and no explicit DEFAULT). The previous flatMap silently dropped those targets; mirror the by-name path by throwing INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA. Add regression tests for top-level and nested struct by-position cases. --- .../analysis/TableOutputResolver.scala | 11 +++- .../spark/sql/connector/InsertIntoTests.scala | 61 +++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 3887d0ec7b24c..1810bb3cab7b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -434,9 +434,14 @@ object TableOutputResolver extends SQLConfHelper with Logging { } val defaults = if (fillDefaultValue) { - actualExpectedCols.drop(inputCols.size).flatMap { expectedCol => - getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues) - .map(expr => applyColumnMetadata(expr, expectedCol)) + actualExpectedCols.drop(inputCols.size).map { expectedCol => + val defaultExpr = getDefaultValueExprOrNullLit( + expectedCol, conf.useNullsForMissingDefaultColumnValues) + if (defaultExpr.isEmpty) { + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( + tableName, (colPath :+ expectedCol.name).quoted) + } + applyColumnMetadata(defaultExpr.get, expectedCol) } } else { Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index dc711739718fa..0319b41d422e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -1962,6 +1962,67 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => } } + test("Insert schema evolution: source missing top-level column by position fails " + + "when null default disabled and column has no explicit DEFAULT") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + withInsertNestedTypeCoercion { + checkError( + exception = intercept[AnalysisException] { + doInsertWithSchemaEvolution(t1, + Seq((1, 200)).toDF("id", "salary")) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`dep`") + ) + } + } + } + } + + test("Insert schema evolution: source missing nested struct field by position fails " + + "when null default disabled") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + withInsertNestedTypeCoercion { + checkError( + exception = intercept[AnalysisException] { + doInsertWithSchemaEvolution(t1, sourceData) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`.`c3`") + ) + } + } + } + } + test("Insert without evolution: source missing top-level column by position fails") { val t1 = s"${catalogAndNamespace}tbl" withTable(t1) { From 6a2db489ec0bcf3edb230266059a27d51e132fdd Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 27 Apr 2026 07:53:50 +0200 Subject: [PATCH 08/10] [SPARK-56550][SQL] Harden INSERT output resolution against silent column drops After fixing trailing by-position default fill (throw when no default), add: - Post-check on resolveColumnsByPosition: result length must match expected arity - enforceFullOutput on reorderColumnsByName and nested struct/array/map resolvers: INSERT (resolveOutputColumns) throws on incomplete resolution; MERGE resolveUpdate keeps enforceFullOutput=false so getOrElse fallback semantics are unchanged. Scalastyle: argcount off for the three nested resolver methods. --- .../analysis/TableOutputResolver.scala | 68 +++++++++++++------ 1 file changed, 49 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 1810bb3cab7b2..d691c449733f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -116,7 +116,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf, errors += _, Nil, - defaultValueFillMode) + defaultValueFillMode, + enforceFullOutput = true) } else { if (expected.size > query.output.size && !fillDefaultValue) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( @@ -162,17 +163,17 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (valueType: StructType, colType: StructType) => val resolvedValue = resolveStructType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case (valueType: ArrayType, colType: ArrayType) => val resolvedValue = resolveArrayType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case (valueType: MapType, colType: MapType) => val resolvedValue = resolveMapType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case _ => checkUpdate(tableName, value, col, conf, addError, colPath) @@ -309,7 +310,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String] = Nil, - defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression] = { + defaultValueFillMode: DefaultValueFillMode.Value, + enforceFullOutput: Boolean = false): Seq[NamedExpression] = { val matchedCols = mutable.HashSet.empty[String] val reordered = expectedCols.flatMap { expectedCol => val matched = inputCols.filter(col => conf.resolver(col.name, expectedCol.name)) @@ -341,15 +343,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (matchedType: StructType, expectedType: StructType) => resolveStructType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case (matchedType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case (matchedType: MapType, expectedType: MapType) => resolveMapType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case _ => checkField( tableName, actualExpectedCol, matchedCol, byName = true, conf, addError, newColPath) @@ -371,6 +373,11 @@ object TableOutputResolver extends SQLConfHelper with Logging { } else { reordered } + } else if (enforceFullOutput) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else expectedCols.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { Nil } @@ -419,15 +426,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (inputType: StructType, expectedType: StructType) => resolveStructType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case (inputType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case (inputType: MapType, expectedType: MapType) => resolveMapType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case _ => checkField(tableName, expectedCol, inputCol, byName = false, conf, addError, newColPath) } @@ -447,7 +454,14 @@ object TableOutputResolver extends SQLConfHelper with Logging { Nil } - matched ++ defaults + val result = matched ++ defaults + if (result.length != actualExpectedCols.size) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) + } + result } private[sql] def checkNullability( @@ -469,6 +483,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { input.nullable && !attr.nullable && conf.storeAssignmentPolicy != StoreAssignmentPolicy.LEGACY } + // scalastyle:off argcount private def resolveStructType( tableName: String, input: Expression, @@ -479,7 +494,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val fields = inputType.zipWithIndex.map { case (f, i) => Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)() @@ -487,7 +503,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE val resolved = if (byName) { reorderColumnsByName(tableName, fields, toAttributes(expectedType), conf, addError, colPath, - defaultValueMode) + defaultValueMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, fields, toAttributes(expectedType), conf, addError, colPath, fillDefaultValue) @@ -500,6 +516,11 @@ object TableOutputResolver extends SQLConfHelper with Logging { struct } Some(applyColumnMetadata(res, expected)) + } else if (enforceFullOutput) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else expectedType.fields.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { None } @@ -515,7 +536,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) val fakeAttr = @@ -523,7 +545,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val res = if (byName) { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, - defaultValueMode) + defaultValueMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, fillDefaultValue) @@ -538,6 +560,9 @@ object TableOutputResolver extends SQLConfHelper with Logging { ArrayTransform(nullCheckedInput, func) } Some(applyColumnMetadata(castedArray, expected)) + } else if (enforceFullOutput) { + val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { None } @@ -553,7 +578,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = false) @@ -561,7 +587,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE val resKey = if (byName) { reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, - defaultValueFillMode) + defaultValueFillMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, fillDefaultValue) @@ -573,7 +599,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { AttributeReference("value", expectedType.valueType, expectedType.valueContainsNull)() val resValue = if (byName) { reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, - defaultValueFillMode) + defaultValueFillMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, fillDefaultValue) @@ -601,10 +627,14 @@ object TableOutputResolver extends SQLConfHelper with Logging { MapFromArrays(newKeys, newValues) } Some(applyColumnMetadata(casted, expected)) + } else if (enforceFullOutput) { + val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { None } } + // scalastyle:on argcount // For table insertions, capture the overflow errors and show proper message. // Without this method, the overflow errors of castings will show hints for turning off ANSI SQL From 3862d98cecef3cf8ffa2792b7982df37843ab34b Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 28 Apr 2026 06:25:34 +0200 Subject: [PATCH 09/10] [SPARK-56550][SQL] Use enforceFullOutput for MERGE resolveUpdate as well Align MERGE assignment resolution with INSERT: resolveStructType/Array/Map now pass enforceFullOutput=true so incomplete nested resolution throws instead of falling back via Option. MergeInto schema evolution suites (824 tests across Group/Delta SQL+Scala) and MergeIntoDataFrameSuite nested struct tests pass. --- .../analysis/TableOutputResolver.scala | 106 +++++++++--------- 1 file changed, 50 insertions(+), 56 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index d691c449733f4..c974e112aaeba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -116,8 +116,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf, errors += _, Nil, - defaultValueFillMode, - enforceFullOutput = true) + defaultValueFillMode) } else { if (expected.size > query.output.size && !fillDefaultValue) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( @@ -128,8 +127,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { } if (errors.nonEmpty) { - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( - tableName, expected.map(_.name).map(toSQLId).mkString(", ")) + cannotFindOutputData(tableName, Nil, expected.map(_.name)) } if (resolved == query.output) { @@ -163,17 +161,17 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (valueType: StructType, colType: StructType) => val resolvedValue = resolveStructType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) + byName = true, conf, addError, colPath, fillChildDefaultValue) resolvedValue.getOrElse(value) case (valueType: ArrayType, colType: ArrayType) => val resolvedValue = resolveArrayType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) + byName = true, conf, addError, colPath, fillChildDefaultValue) resolvedValue.getOrElse(value) case (valueType: MapType, colType: MapType) => val resolvedValue = resolveMapType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) + byName = true, conf, addError, colPath, fillChildDefaultValue) resolvedValue.getOrElse(value) case _ => checkUpdate(tableName, value, col, conf, addError, colPath) @@ -310,8 +308,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String] = Nil, - defaultValueFillMode: DefaultValueFillMode.Value, - enforceFullOutput: Boolean = false): Seq[NamedExpression] = { + defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression] = { val matchedCols = mutable.HashSet.empty[String] val reordered = expectedCols.flatMap { expectedCol => val matched = inputCols.filter(col => conf.resolver(col.name, expectedCol.name)) @@ -323,9 +320,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { None } if (defaultExpr.isEmpty) { - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( - tableName, newColPath.quoted - ) + cannotFindOutputData(tableName, newColPath.quoted) } Some(applyColumnMetadata(defaultExpr.get, expectedCol)) } else if (matched.length > 1) { @@ -343,15 +338,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (matchedType: StructType, expectedType: StructType) => resolveStructType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) + byName = true, conf, addError, newColPath, childFillDefaultValue) case (matchedType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) + byName = true, conf, addError, newColPath, childFillDefaultValue) case (matchedType: MapType, expectedType: MapType) => resolveMapType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) + byName = true, conf, addError, newColPath, childFillDefaultValue) case _ => checkField( tableName, actualExpectedCol, matchedCol, byName = true, conf, addError, newColPath) @@ -373,13 +368,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { } else { reordered } - } else if (enforceFullOutput) { - val colName = - if (colPath.nonEmpty) colPath.quoted - else expectedCols.map(_.name).map(toSQLId).mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - Nil + cannotFindOutputData(tableName, colPath, expectedCols.map(_.name)) } } @@ -426,15 +416,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (inputType: StructType, expectedType: StructType) => resolveStructType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) + byName = false, conf, addError, newColPath, fillDefaultValue) case (inputType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) + byName = false, conf, addError, newColPath, fillDefaultValue) case (inputType: MapType, expectedType: MapType) => resolveMapType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) + byName = false, conf, addError, newColPath, fillDefaultValue) case _ => checkField(tableName, expectedCol, inputCol, byName = false, conf, addError, newColPath) } @@ -445,8 +435,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultExpr = getDefaultValueExprOrNullLit( expectedCol, conf.useNullsForMissingDefaultColumnValues) if (defaultExpr.isEmpty) { - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( - tableName, (colPath :+ expectedCol.name).quoted) + cannotFindOutputData(tableName, (colPath :+ expectedCol.name).quoted) } applyColumnMetadata(defaultExpr.get, expectedCol) } @@ -456,10 +445,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val result = matched ++ defaults if (result.length != actualExpectedCols.size) { - val colName = - if (colPath.nonEmpty) colPath.quoted - else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) + cannotFindOutputData(tableName, colPath, actualExpectedCols.map(_.name)) } result } @@ -483,7 +469,6 @@ object TableOutputResolver extends SQLConfHelper with Logging { input.nullable && !attr.nullable && conf.storeAssignmentPolicy != StoreAssignmentPolicy.LEGACY } - // scalastyle:off argcount private def resolveStructType( tableName: String, input: Expression, @@ -494,8 +479,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean, - enforceFullOutput: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val fields = inputType.zipWithIndex.map { case (f, i) => Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)() @@ -503,7 +487,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE val resolved = if (byName) { reorderColumnsByName(tableName, fields, toAttributes(expectedType), conf, addError, colPath, - defaultValueMode, enforceFullOutput) + defaultValueMode) } else { resolveColumnsByPosition( tableName, fields, toAttributes(expectedType), conf, addError, colPath, fillDefaultValue) @@ -516,13 +500,11 @@ object TableOutputResolver extends SQLConfHelper with Logging { struct } Some(applyColumnMetadata(res, expected)) - } else if (enforceFullOutput) { - val colName = - if (colPath.nonEmpty) colPath.quoted - else expectedType.fields.map(_.name).map(toSQLId).mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - None + cannotFindOutputData( + tableName, + colPath, + expectedType.fields.iterator.map(_.name).toSeq) } } @@ -536,8 +518,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean, - enforceFullOutput: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) val fakeAttr = @@ -545,7 +526,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val res = if (byName) { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, - defaultValueMode, enforceFullOutput) + defaultValueMode) } else { resolveColumnsByPosition( tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, fillDefaultValue) @@ -560,11 +541,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { ArrayTransform(nullCheckedInput, func) } Some(applyColumnMetadata(castedArray, expected)) - } else if (enforceFullOutput) { - val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - None + cannotFindOutputData(tableName, colPath, toSQLId(expected.name)) } } @@ -578,8 +556,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean, - enforceFullOutput: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = false) @@ -587,7 +564,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE val resKey = if (byName) { reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, - defaultValueFillMode, enforceFullOutput) + defaultValueFillMode) } else { resolveColumnsByPosition( tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, fillDefaultValue) @@ -599,7 +576,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { AttributeReference("value", expectedType.valueType, expectedType.valueContainsNull)() val resValue = if (byName) { reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, - defaultValueFillMode, enforceFullOutput) + defaultValueFillMode) } else { resolveColumnsByPosition( tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, fillDefaultValue) @@ -627,14 +604,31 @@ object TableOutputResolver extends SQLConfHelper with Logging { MapFromArrays(newKeys, newValues) } Some(applyColumnMetadata(casted, expected)) - } else if (enforceFullOutput) { - val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - None + cannotFindOutputData(tableName, colPath, toSQLId(expected.name)) } } - // scalastyle:on argcount + + private def cannotFindOutputData(tableName: String, quotedColName: String): Nothing = + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( + tableName, quotedColName) + + private def cannotFindOutputData( + tableName: String, + colPath: Seq[String], + quotedFallback: String): Nothing = + cannotFindOutputData( + tableName, + if (colPath.nonEmpty) colPath.quoted else quotedFallback) + + private def cannotFindOutputData( + tableName: String, + colPath: Seq[String], + missingNames: Iterable[String]): Nothing = + cannotFindOutputData( + tableName, + colPath, + missingNames.iterator.map(toSQLId).mkString(", ")) // For table insertions, capture the overflow errors and show proper message. // Without this method, the overflow errors of castings will show hints for turning off ANSI SQL From e9525bf81eeedfdbf324d09232b87b5615f431a0 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 18 May 2026 10:25:44 -0700 Subject: [PATCH 10/10] [SPARK-56550][SQL] Restore enforceFullOutput for MERGE output resolution Revert the 2c56adef change that made incomplete nested resolution throw for MERGE/UPDATE, restoring addError-based cast failure messages in AlignMergeAssignmentsSuite and AlignUpdateAssignmentsSuite. Align two INSERT negative tests with checkError for CANNOT_FIND_DATA. --- .../analysis/TableOutputResolver.scala | 106 +++++++++--------- .../spark/sql/connector/InsertIntoTests.scala | 26 +++-- 2 files changed, 74 insertions(+), 58 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index c974e112aaeba..d691c449733f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -116,7 +116,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf, errors += _, Nil, - defaultValueFillMode) + defaultValueFillMode, + enforceFullOutput = true) } else { if (expected.size > query.output.size && !fillDefaultValue) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( @@ -127,7 +128,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { } if (errors.nonEmpty) { - cannotFindOutputData(tableName, Nil, expected.map(_.name)) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( + tableName, expected.map(_.name).map(toSQLId).mkString(", ")) } if (resolved == query.output) { @@ -161,17 +163,17 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (valueType: StructType, colType: StructType) => val resolvedValue = resolveStructType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case (valueType: ArrayType, colType: ArrayType) => val resolvedValue = resolveArrayType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case (valueType: MapType, colType: MapType) => val resolvedValue = resolveMapType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case _ => checkUpdate(tableName, value, col, conf, addError, colPath) @@ -308,7 +310,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String] = Nil, - defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression] = { + defaultValueFillMode: DefaultValueFillMode.Value, + enforceFullOutput: Boolean = false): Seq[NamedExpression] = { val matchedCols = mutable.HashSet.empty[String] val reordered = expectedCols.flatMap { expectedCol => val matched = inputCols.filter(col => conf.resolver(col.name, expectedCol.name)) @@ -320,7 +323,9 @@ object TableOutputResolver extends SQLConfHelper with Logging { None } if (defaultExpr.isEmpty) { - cannotFindOutputData(tableName, newColPath.quoted) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( + tableName, newColPath.quoted + ) } Some(applyColumnMetadata(defaultExpr.get, expectedCol)) } else if (matched.length > 1) { @@ -338,15 +343,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (matchedType: StructType, expectedType: StructType) => resolveStructType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case (matchedType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case (matchedType: MapType, expectedType: MapType) => resolveMapType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case _ => checkField( tableName, actualExpectedCol, matchedCol, byName = true, conf, addError, newColPath) @@ -368,8 +373,13 @@ object TableOutputResolver extends SQLConfHelper with Logging { } else { reordered } + } else if (enforceFullOutput) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else expectedCols.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - cannotFindOutputData(tableName, colPath, expectedCols.map(_.name)) + Nil } } @@ -416,15 +426,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (inputType: StructType, expectedType: StructType) => resolveStructType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case (inputType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case (inputType: MapType, expectedType: MapType) => resolveMapType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case _ => checkField(tableName, expectedCol, inputCol, byName = false, conf, addError, newColPath) } @@ -435,7 +445,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultExpr = getDefaultValueExprOrNullLit( expectedCol, conf.useNullsForMissingDefaultColumnValues) if (defaultExpr.isEmpty) { - cannotFindOutputData(tableName, (colPath :+ expectedCol.name).quoted) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( + tableName, (colPath :+ expectedCol.name).quoted) } applyColumnMetadata(defaultExpr.get, expectedCol) } @@ -445,7 +456,10 @@ object TableOutputResolver extends SQLConfHelper with Logging { val result = matched ++ defaults if (result.length != actualExpectedCols.size) { - cannotFindOutputData(tableName, colPath, actualExpectedCols.map(_.name)) + val colName = + if (colPath.nonEmpty) colPath.quoted + else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } result } @@ -469,6 +483,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { input.nullable && !attr.nullable && conf.storeAssignmentPolicy != StoreAssignmentPolicy.LEGACY } + // scalastyle:off argcount private def resolveStructType( tableName: String, input: Expression, @@ -479,7 +494,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val fields = inputType.zipWithIndex.map { case (f, i) => Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)() @@ -487,7 +503,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE val resolved = if (byName) { reorderColumnsByName(tableName, fields, toAttributes(expectedType), conf, addError, colPath, - defaultValueMode) + defaultValueMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, fields, toAttributes(expectedType), conf, addError, colPath, fillDefaultValue) @@ -500,11 +516,13 @@ object TableOutputResolver extends SQLConfHelper with Logging { struct } Some(applyColumnMetadata(res, expected)) + } else if (enforceFullOutput) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else expectedType.fields.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - cannotFindOutputData( - tableName, - colPath, - expectedType.fields.iterator.map(_.name).toSeq) + None } } @@ -518,7 +536,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) val fakeAttr = @@ -526,7 +545,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val res = if (byName) { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, - defaultValueMode) + defaultValueMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, fillDefaultValue) @@ -541,8 +560,11 @@ object TableOutputResolver extends SQLConfHelper with Logging { ArrayTransform(nullCheckedInput, func) } Some(applyColumnMetadata(castedArray, expected)) + } else if (enforceFullOutput) { + val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - cannotFindOutputData(tableName, colPath, toSQLId(expected.name)) + None } } @@ -556,7 +578,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = false) @@ -564,7 +587,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE val resKey = if (byName) { reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, - defaultValueFillMode) + defaultValueFillMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, fillDefaultValue) @@ -576,7 +599,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { AttributeReference("value", expectedType.valueType, expectedType.valueContainsNull)() val resValue = if (byName) { reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, - defaultValueFillMode) + defaultValueFillMode, enforceFullOutput) } else { resolveColumnsByPosition( tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, fillDefaultValue) @@ -604,31 +627,14 @@ object TableOutputResolver extends SQLConfHelper with Logging { MapFromArrays(newKeys, newValues) } Some(applyColumnMetadata(casted, expected)) + } else if (enforceFullOutput) { + val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { - cannotFindOutputData(tableName, colPath, toSQLId(expected.name)) + None } } - - private def cannotFindOutputData(tableName: String, quotedColName: String): Nothing = - throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( - tableName, quotedColName) - - private def cannotFindOutputData( - tableName: String, - colPath: Seq[String], - quotedFallback: String): Nothing = - cannotFindOutputData( - tableName, - if (colPath.nonEmpty) colPath.quoted else quotedFallback) - - private def cannotFindOutputData( - tableName: String, - colPath: Seq[String], - missingNames: Iterable[String]): Nothing = - cannotFindOutputData( - tableName, - colPath, - missingNames.iterator.map(toSQLId).mkString(", ")) + // scalastyle:on argcount // For table insertions, capture the overflow errors and show proper message. // Without this method, the overflow errors of castings will show hints for turning off ANSI SQL diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 0319b41d422e1..42017c2dd60eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -2062,10 +2062,15 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => StructField("c2", StringType)))))) val sourceData = spark.createDataFrame( spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) - val ex = intercept[AnalysisException] { - doInsertByName(t1, sourceData) - } - assert(ex.getMessage.contains("Cannot find data")) + checkError( + exception = intercept[AnalysisException] { + doInsertByName(t1, sourceData) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`.`c3`") + ) } } @@ -2091,10 +2096,15 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => StructField("c2", StringType)))))) val sourceData = spark.createDataFrame( spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) - val ex = intercept[AnalysisException] { - doInsertWithSchemaEvolution(t1, sourceData, byName = true) - } - assert(ex.getMessage.contains("Cannot find data")) + checkError( + exception = intercept[AnalysisException] { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`.`c3`") + ) } }