diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 09c6a0984258c..faa78e0306364 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -29,6 +29,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.internal.config.ConfigBindingPolicy import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ +import org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode._ import org.apache.spark.sql.catalyst.analysis.resolver.{ AnalyzerBridgeState, HybridAnalyzer, @@ -3788,9 +3789,16 @@ class Analyzer( validateStoreAssignmentPolicy() TableOutputResolver.suitableForByNameCheck(v2Write.isByName, expected = v2Write.table.output, queryOutput = v2Write.query.output) + // With schema evolution + coercion flag, missing top-level columns AND missing nested + // struct fields are filled with defaults/null (RECURSE mode). Otherwise, only missing + // top-level columns are filled via FILL mode; missing nested struct fields still cause + // schema enforcement errors. + val defaultValueFillMode = + if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) RECURSE + else FILL val projection = TableOutputResolver.resolveOutputColumns( v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf, - supportColDefaultValue = true) + defaultValueFillMode) if (projection != v2Write.query) { val cleanedTable = v2Write.table match { case r: DataSourceV2Relation => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 7eacc5ab9b2ad..d691c449733f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -41,9 +41,10 @@ object TableOutputResolver extends SQLConfHelper with Logging { /** * Modes for filling in default or null values for missing columns. - * If FILL, fill missing top-level columns with their default values. - * If RECURSE, fill missing top-level columns and also recurse into nested struct - * fields to fill null. + * If FILL, fill missing top-level columns with their default values (by-name reorder path). + * If RECURSE, fill missing top-level columns (including trailing columns on the by-position + * path for INSERT with schema evolution when enabled) and recurse into nested structs, + * arrays, and maps to fill missing struct fields with null or defaults. * If NONE, do not fill any missing columns. */ object DefaultValueFillMode extends Enumeration { @@ -92,19 +93,22 @@ object TableOutputResolver extends SQLConfHelper with Logging { query: LogicalPlan, byName: Boolean, conf: SQLConf, - supportColDefaultValue: Boolean = false): LogicalPlan = { + defaultValueFillMode: DefaultValueFillMode.Value = NONE): LogicalPlan = { if (expected.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( tableName, expected.map(_.name), query.output) } + // In RECURSE mode, allow fewer source columns than target by filling trailing columns + // with defaults. In other modes, a column count mismatch in by-position resolution is + // an error. + val fillDefaultValue = defaultValueFillMode == RECURSE val errors = new mutable.ArrayBuffer[String]() val resolved: Seq[NamedExpression] = if (byName) { - // If a top-level column does not have a corresponding value in the input query, fill with - // the column's default value. We need to pass `fillDefaultValue` as FILL here, if the - // `supportColDefaultValue` parameter is also true. - val defaultValueFillMode = if (supportColDefaultValue) FILL else NONE + // By-name resolution: the defaultValueFillMode is passed through to control whether + // missing top-level columns are filled (FILL/RECURSE) and whether missing nested + // struct fields are also filled (RECURSE only). reorderColumnsByName( tableName, query.output, @@ -112,13 +116,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf, errors += _, Nil, - defaultValueFillMode) + defaultValueFillMode, + enforceFullOutput = true) } else { - if (expected.size > query.output.size) { + if (expected.size > query.output.size && !fillDefaultValue) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( tableName, expected.map(_.name), query.output) } - resolveColumnsByPosition(tableName, query.output, expected, conf, errors += _) + resolveColumnsByPosition( + tableName, query.output, expected, conf, errors += _, fillDefaultValue = fillDefaultValue) } if (errors.nonEmpty) { @@ -157,17 +163,17 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (valueType: StructType, colType: StructType) => val resolvedValue = resolveStructType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case (valueType: ArrayType, colType: ArrayType) => val resolvedValue = resolveArrayType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case (valueType: MapType, colType: MapType) => val resolvedValue = resolveMapType( tableName, value, valueType, col, colType, - byName = true, conf, addError, colPath, fillChildDefaultValue) + byName = true, conf, addError, colPath, fillChildDefaultValue, enforceFullOutput = false) resolvedValue.getOrElse(value) case _ => checkUpdate(tableName, value, col, conf, addError, colPath) @@ -304,7 +310,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String] = Nil, - defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression] = { + defaultValueFillMode: DefaultValueFillMode.Value, + enforceFullOutput: Boolean = false): Seq[NamedExpression] = { val matchedCols = mutable.HashSet.empty[String] val reordered = expectedCols.flatMap { expectedCol => val matched = inputCols.filter(col => conf.resolver(col.name, expectedCol.name)) @@ -336,15 +343,15 @@ object TableOutputResolver extends SQLConfHelper with Logging { case (matchedType: StructType, expectedType: StructType) => resolveStructType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case (matchedType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case (matchedType: MapType, expectedType: MapType) => resolveMapType( tableName, matchedCol, matchedType, actualExpectedCol, expectedType, - byName = true, conf, addError, newColPath, childFillDefaultValue) + byName = true, conf, addError, newColPath, childFillDefaultValue, enforceFullOutput) case _ => checkField( tableName, actualExpectedCol, matchedCol, byName = true, conf, addError, newColPath) @@ -366,6 +373,11 @@ object TableOutputResolver extends SQLConfHelper with Logging { } else { reordered } + } else if (enforceFullOutput) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else expectedCols.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { Nil } @@ -377,7 +389,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { expectedCols: Seq[Attribute], conf: SQLConf, addError: String => Unit, - colPath: Seq[String] = Nil): Seq[NamedExpression] = { + colPath: Seq[String] = Nil, + fillDefaultValue: Boolean = false): Seq[NamedExpression] = { val actualExpectedCols = expectedCols.map { attr => attr.withDataType { CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) } } @@ -393,7 +406,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { tableName, colPath.quoted, extraColsStr ) } - } else if (inputCols.size < actualExpectedCols.size) { + } else if (inputCols.size < actualExpectedCols.size && !fillDefaultValue) { val missingColsStr = actualExpectedCols.takeRight(actualExpectedCols.size - inputCols.size) .map(col => toSQLId(col.name)) .mkString(", ") @@ -407,25 +420,48 @@ object TableOutputResolver extends SQLConfHelper with Logging { } } - inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) => + val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) => val newColPath = colPath :+ expectedCol.name (inputCol.dataType, expectedCol.dataType) match { case (inputType: StructType, expectedType: StructType) => resolveStructType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue = false) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case (inputType: ArrayType, expectedType: ArrayType) => resolveArrayType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue = false) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case (inputType: MapType, expectedType: MapType) => resolveMapType( tableName, inputCol, inputType, expectedCol, expectedType, - byName = false, conf, addError, newColPath, fillDefaultValue = false) + byName = false, conf, addError, newColPath, fillDefaultValue, enforceFullOutput = true) case _ => checkField(tableName, expectedCol, inputCol, byName = false, conf, addError, newColPath) } } + + val defaults = if (fillDefaultValue) { + actualExpectedCols.drop(inputCols.size).map { expectedCol => + val defaultExpr = getDefaultValueExprOrNullLit( + expectedCol, conf.useNullsForMissingDefaultColumnValues) + if (defaultExpr.isEmpty) { + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( + tableName, (colPath :+ expectedCol.name).quoted) + } + applyColumnMetadata(defaultExpr.get, expectedCol) + } + } else { + Nil + } + + val result = matched ++ defaults + if (result.length != actualExpectedCols.size) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) + } + result } private[sql] def checkNullability( @@ -447,6 +483,7 @@ object TableOutputResolver extends SQLConfHelper with Logging { input.nullable && !attr.nullable && conf.storeAssignmentPolicy != StoreAssignmentPolicy.LEGACY } + // scalastyle:off argcount private def resolveStructType( tableName: String, input: Expression, @@ -457,7 +494,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val fields = inputType.zipWithIndex.map { case (f, i) => Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)() @@ -465,10 +503,10 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE val resolved = if (byName) { reorderColumnsByName(tableName, fields, toAttributes(expectedType), conf, addError, colPath, - defaultValueMode) + defaultValueMode, enforceFullOutput) } else { resolveColumnsByPosition( - tableName, fields, toAttributes(expectedType), conf, addError, colPath) + tableName, fields, toAttributes(expectedType), conf, addError, colPath, fillDefaultValue) } if (resolved.length == expectedType.length) { val struct = CreateStruct(resolved) @@ -478,6 +516,11 @@ object TableOutputResolver extends SQLConfHelper with Logging { struct } Some(applyColumnMetadata(res, expected)) + } else if (enforceFullOutput) { + val colName = + if (colPath.nonEmpty) colPath.quoted + else expectedType.fields.map(_.name).map(toSQLId).mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { None } @@ -493,7 +536,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) val fakeAttr = @@ -501,9 +545,10 @@ object TableOutputResolver extends SQLConfHelper with Logging { val res = if (byName) { val defaultValueMode = if (fillDefaultValue) RECURSE else NONE reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, - defaultValueMode) + defaultValueMode, enforceFullOutput) } else { - resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath) + resolveColumnsByPosition( + tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, fillDefaultValue) } if (res.length == 1) { val castedArray = @@ -515,6 +560,9 @@ object TableOutputResolver extends SQLConfHelper with Logging { ArrayTransform(nullCheckedInput, func) } Some(applyColumnMetadata(castedArray, expected)) + } else if (enforceFullOutput) { + val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { None } @@ -530,7 +578,8 @@ object TableOutputResolver extends SQLConfHelper with Logging { conf: SQLConf, addError: String => Unit, colPath: Seq[String], - fillDefaultValue: Boolean): Option[NamedExpression] = { + fillDefaultValue: Boolean, + enforceFullOutput: Boolean): Option[NamedExpression] = { val nullCheckedInput = checkNullability(input, expected, conf, colPath) val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = false) @@ -538,9 +587,10 @@ object TableOutputResolver extends SQLConfHelper with Logging { val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE val resKey = if (byName) { reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, - defaultValueFillMode) + defaultValueFillMode, enforceFullOutput) } else { - resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath) + resolveColumnsByPosition( + tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, fillDefaultValue) } val valueParam = @@ -549,10 +599,10 @@ object TableOutputResolver extends SQLConfHelper with Logging { AttributeReference("value", expectedType.valueType, expectedType.valueContainsNull)() val resValue = if (byName) { reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, - defaultValueFillMode) + defaultValueFillMode, enforceFullOutput) } else { resolveColumnsByPosition( - tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath) + tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath, fillDefaultValue) } if (resKey.length == 1 && resValue.length == 1) { @@ -577,10 +627,14 @@ object TableOutputResolver extends SQLConfHelper with Logging { MapFromArrays(newKeys, newValues) } Some(applyColumnMetadata(casted, expected)) + } else if (enforceFullOutput) { + val colName = if (colPath.nonEmpty) colPath.quoted else toSQLId(expected.name) + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, colName) } else { None } } + // scalastyle:on argcount // For table insertions, capture the overflow errors and show proper message. // Without this method, the overflow errors of castings will show hints for turning off ANSI SQL diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fcb736e404854..29a734f055985 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -7270,6 +7270,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED = + buildConf("spark.sql.insertNestedTypeCoercion.enabled") + .internal() + .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill missing nested " + + "struct fields with null when the source has fewer nested fields than the target " + + "table. Also relaxes by-position column-count enforcement so trailing missing " + + "top-level columns are filled with their default value (or null). This is " + + "experimental and the semantics may change.") + .version("4.2.0") + .booleanConf + .createWithDefault(false) + val TIME_TYPE_ENABLED = buildConf("spark.sql.timeType.enabled") .internal() @@ -8597,6 +8609,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def coerceMergeNestedTypes: Boolean = getConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED) + def coerceInsertNestedTypes: Boolean = + getConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED) + def isTimeTypeEnabled: Boolean = getConf(SQLConf.TIME_TYPE_ENABLED) def listaggAllowDistinctCastWithOrder: Boolean = getConf(LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index ce41bbe4aeb3d..7122dd52ef1a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -528,7 +528,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase { query, byName, conf, - supportColDefaultValue = true) + TableOutputResolver.DefaultValueFillMode.FILL) } catch { case e: AnalysisException if staticPartCols.nonEmpty && (e.getCondition == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" || diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 4f023136a6fe1..42017c2dd60eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -574,6 +574,26 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => byName: Boolean = false, replaceWhere: Option[String] = None): Unit + /** Insert data into a table by name without schema evolution. */ + protected def doInsertByName( + tableName: String, + insert: DataFrame, + mode: SaveMode = SaveMode.Append): Unit = { + val tmpView = "tmp_view" + withTempView(tmpView) { + insert.createOrReplaceTempView(tmpView) + val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO" + sql(s"INSERT $overwrite TABLE $tableName BY NAME SELECT * FROM $tmpView") + } + } + + /** Run a block with INSERT nested type coercion enabled. */ + protected def withInsertNestedTypeCoercion(f: => Unit): Unit = { + withSQLConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED.key -> "true") { + f + } + } + test("Insert schema evolution: extra column - no auto-schema-evolution capability") { val t1 = s"${catalogAndNamespace}tbl" withTable(t1) { @@ -1403,4 +1423,722 @@ trait InsertIntoSchemaEvolutionTests { this: InsertIntoTests => assert(spark.table(t1).schema("id").dataType === IntegerType) } } + + // --------------------------------------------------------------------------- + // Tests for source with fewer columns/fields than target + // --------------------------------------------------------------------------- + + test("Insert schema evolution: source missing top-level column by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val schema = StructType(Seq( + StructField("id", IntegerType), + StructField("salary", IntegerType), + StructField("dep", StringType))) + val data = Seq(Row(0, 100, "sales")) + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data), schema)) + doInsertWithSchemaEvolution(t1, + Seq((1, "engineering")).toDF("id", "dep"), + byName = true) + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, null, "engineering"))) + } + } + + test("Insert schema evolution: source missing top-level column by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + // By position: source col 1 maps to target col 1, source col 2 maps to target col 2, + // trailing target col 3 is filled with null. + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, + Seq((1, 200)).toDF("id", "salary")) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, 200, null))) + } + } + + test("Insert schema evolution: source missing top-level column with DEFAULT by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + doInsertWithSchemaEvolution(t1, + Seq((1, "engineering")).toDF("id", "dep"), + byName = true) + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, 200, "engineering"))) + } + } + + test("Insert schema evolution: source missing top-level column with DEFAULT by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT 'unknown') USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, + Seq((1, 200)).toDF("id", "salary")) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, 100, "sales"), Row(1, 200, "unknown"))) + } + } + + test("Insert schema evolution: source missing nested struct field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)))) + } + } + + test("Insert schema evolution: source missing nested struct field by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)))) + } + } + + test("Insert schema evolution: source missing field in struct nested in array by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"a array>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null))))) + } + } + + test("Insert schema evolution: source missing field in struct nested in array by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"a array>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null))))) + } + } + + test("Insert schema evolution: source missing deeply nested struct field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", BooleanType))))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType))))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null))))) + } + } + + test("Insert schema evolution: source with null struct and missing nested field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", IntegerType)))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, null))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", 10)), Row(1, null))) + } + } + + test("Insert schema evolution: source with null struct and missing nested field by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", IntegerType)))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, null))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", 10)), Row(1, null))) + } + } + + test("Insert schema evolution: mixed null and non-null structs with missing field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2, null))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2, null))) + } + } + + test("Insert schema evolution: null deeply nested struct with missing field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", BooleanType))))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"s struct>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType))))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null)))) + } + } + + test("Insert schema evolution: null struct in array with missing field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"a array>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("a", ArrayType(StructType(Seq( + StructField("c1", IntegerType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null)))) + } + } + + test("Insert schema evolution: source missing field in struct nested in map value by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"m map>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq( + Row(0, Map("x" -> Row(1, true))), + Row(1, Map("y" -> Row(10, null))))) + } + } + + test("Insert schema evolution: source missing field in struct nested in map value by position") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", BooleanType))))))) + sql(s"CREATE TABLE $t1 (id int, " + + s"m map>) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))), + targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("m", MapType(StringType, StructType(Seq( + StructField("c1", IntegerType))))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))), + sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData) + } + checkAnswer( + sql(s"SELECT * FROM $t1"), + Seq( + Row(0, Map("x" -> Row(1, true))), + Row(1, Map("y" -> Row(10, null))))) + } + } + + test("Insert schema evolution: extra and missing top-level column by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + // Source has "active" (extra) but is missing "salary". Column count is the same (3) + // but names differ; by-name resolution should add "active" via schema evolution + // and fill "salary" with null. + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, + Seq((1, "engineering", true)).toDF("id", "dep", "active"), + byName = true) + } + checkAnswer( + sql(s"SELECT id, salary, dep, active FROM $t1"), + Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true))) + } + } + + test("Insert schema evolution: extra and missing nested struct field by name") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field count is the same + // (3) but names differ; by-name resolution should add "c4" via schema evolution and fill + // "c3" with null. + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c4", DoubleType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))), sourceSchema) + withInsertNestedTypeCoercion { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + } + checkAnswer( + sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"), + Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14))) + } + } + + // --------------------------------------------------------------------------- + // Negative tests: missing columns/fields should fail WITHOUT schema evolution + // --------------------------------------------------------------------------- + + test("Insert without evolution: source missing top-level column by name fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + // Without explicit DEFAULT on `salary`, missing by-name data only errors when null-fill + // for missing defaults is disabled; otherwise FILL mode inserts null for `salary`. + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep")) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`salary`") + ) + } + } + } + + test("Insert schema evolution: source missing top-level column by position fails " + + "when null default disabled and column has no explicit DEFAULT") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + withInsertNestedTypeCoercion { + checkError( + exception = intercept[AnalysisException] { + doInsertWithSchemaEvolution(t1, + Seq((1, 200)).toDF("id", "salary")) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`dep`") + ) + } + } + } + } + + test("Insert schema evolution: source missing nested struct field by position fails " + + "when null default disabled") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + withInsertNestedTypeCoercion { + checkError( + exception = intercept[AnalysisException] { + doInsertWithSchemaEvolution(t1, sourceData) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`.`c3`") + ) + } + } + } + } + + test("Insert without evolution: source missing top-level column by position fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format") + doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep")) + checkError( + exception = intercept[AnalysisException] { + doInsert(t1, Seq((1, 200)).toDF("id", "salary")) + }, + condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "tableColumns" -> "`id`, `salary`, `dep`", + "dataColumns" -> "`id`, `salary`") + ) + } + } + + test("Insert without evolution: source missing nested struct field by name fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + checkError( + exception = intercept[AnalysisException] { + doInsertByName(t1, sourceData) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`.`c3`") + ) + } + } + + test("Insert with evolution but without coercion flag:" + + " source missing nested struct field by name fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + checkError( + exception = intercept[AnalysisException] { + doInsertWithSchemaEvolution(t1, sourceData, byName = true) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`.`c3`") + ) + } + } + + test("Insert without evolution: source missing nested struct field by position fails") { + val t1 = s"${catalogAndNamespace}tbl" + withTable(t1) { + val targetSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType), + StructField("c3", BooleanType)))))) + sql(s"CREATE TABLE $t1 (id int, s struct) USING $v2Format") + val targetData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), targetSchema) + doInsert(t1, targetData) + + val sourceSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType)))))) + val sourceData = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), sourceSchema) + checkError( + exception = intercept[AnalysisException] { + doInsert(t1, sourceData) + }, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + parameters = Map( + "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"), + "colName" -> "`s`", + "missingFields" -> "`c3`") + ) + } + } } diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index 2aa6cb885ca31..36fda2b506886 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -659,6 +659,7 @@ spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio spark.sql.inMemoryColumnarStorage.hugeVectorThreshold spark.sql.inMemoryColumnarStorage.partitionPruning spark.sql.inMemoryTableScanStatistics.enable +spark.sql.insertNestedTypeCoercion.enabled spark.sql.join.preferSortMergeJoin spark.sql.json.enableExactStringParsing spark.sql.json.enablePartialResults