From 1a82d94f5ef75deb157ba3f848749a62bbf04725 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 18 Jun 2026 15:09:41 -0700 Subject: [PATCH 1/3] [SPARK-57544][SQL] Rework column ID validation for nested fields in DSv2 --- .../resources/error/error-conditions.json | 6 - .../catalyst/util/FieldMetadataUtils.scala | 23 ++ .../apache/spark/sql/types/StructField.scala | 38 +++ .../spark/sql/connector/catalog/Column.java | 206 +++++++++------ .../catalyst/analysis/V2TableReference.scala | 12 +- .../plans/logical/ColumnDefinition.scala | 5 +- .../sql/connector/catalog/CatalogV2Util.scala | 101 +++++--- .../sql/connector/catalog/V2TableUtil.scala | 67 +---- .../sql/errors/QueryCompilationErrors.scala | 10 - .../sql/internal/connector/ColumnImpl.scala | 36 +-- .../apache/spark/sql/util/SchemaUtils.scala | 47 +++- .../sql/connector/catalog/CatalogSuite.scala | 94 +++---- .../sql/connector/catalog/ColumnSuite.scala | 236 ++++++++++++++++++ .../connector/catalog/InMemoryBaseTable.scala | 81 +++--- ...nMemoryRowLevelOperationTableCatalog.scala | 3 +- .../catalog/InMemoryTableCatalog.scala | 3 +- .../NullColumnIdInMemoryTableCatalog.scala | 4 +- .../NullTableIdInMemoryTableCatalog.scala | 2 +- .../TypeChangeResetsColIdTableCatalog.scala | 5 +- .../connector/catalog/V2TableUtilSuite.scala | 147 +++++++++-- .../spark/sql/connector/catalog/txns.scala | 14 +- .../datasources/v2/CreateTableLikeExec.scala | 17 +- .../datasources/v2/V2TableRefreshUtil.scala | 10 - .../v2/WriteToDataSourceV2Exec.scala | 2 +- .../DataSourceV2DataFrameSuite.scala | 181 ++++---------- .../DataSourceV2ExtSessionColumnIdSuite.scala | 6 +- .../sql/connector/DataSourceV2SQLSuite.scala | 78 +++++- .../connector/MergeIntoDataFrameSuite.scala | 2 +- 28 files changed, 931 insertions(+), 505 deletions(-) create mode 100644 sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 8e7125a07c1e8..7aacaf73185e0 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3009,12 +3009,6 @@ "" ] }, - "COLUMN_ID_MISMATCH" : { - "message" : [ - "Column IDs have changed:", - "" - ] - }, "METADATA_COLUMNS_MISMATCH" : { "message" : [ "Metadata columns have changed:", diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala new file mode 100644 index 0000000000000..c3ac1db9dc587 --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +object FieldMetadataUtils { + // Metadata key for the field ID used to track column identity across schema evolution + val FIELD_ID_METADATA_KEY = "FIELD_ID" +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala index eb3d30051880a..5bb12cf80e45c 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala @@ -26,6 +26,7 @@ import org.json4s.JsonDSL._ import org.apache.spark.SparkException import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.util.{CollationFactory, QuotingUtils, StringConcat} +import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY} import org.apache.spark.util.SparkSchemaUtils @@ -243,6 +244,43 @@ case class StructField( metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY) } + /** + * Updates the field with an ID for column identity tracking. + */ + def withId(id: String): StructField = { + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .putString(FIELD_ID_METADATA_KEY, id) + .build() + copy(metadata = newMetadata) + } + + /** + * Returns the ID of this field, if set. + */ + def id: Option[String] = { + if (metadata.contains(FIELD_ID_METADATA_KEY)) { + Some(metadata.getString(FIELD_ID_METADATA_KEY)) + } else { + None + } + } + + /** + * Returns a copy of this field with the field ID removed, or this field if no ID is set. + */ + def clearId(): StructField = { + if (metadata.contains(FIELD_ID_METADATA_KEY)) { + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .remove(FIELD_ID_METADATA_KEY) + .build() + copy(metadata = newMetadata) + } else { + this + } + } + private def getDDLDefault = getDefault() .orElse(getCurrentDefaultValue()) .map(" DEFAULT " + _) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java index 03c0409f39b46..3557779c06b08 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java @@ -18,12 +18,16 @@ package org.apache.spark.sql.connector.catalog; import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; import javax.annotation.Nullable; +import org.apache.spark.SparkIllegalArgumentException; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.internal.connector.ColumnImpl; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; /** * An interface representing a column of a {@link Table}. It defines basic properties of a column, @@ -40,11 +44,11 @@ public interface Column { static Column create(String name, DataType dataType) { - return create(name, dataType, true); + return newBuilder(name, dataType).build(); } static Column create(String name, DataType dataType, boolean nullable) { - return create(name, dataType, nullable, null, null); + return newBuilder(name, dataType).nullable(nullable).build(); } static Column create( @@ -53,16 +57,11 @@ static Column create( boolean nullable, String comment, String metadataInJSON) { - return new ColumnImpl( - name, - dataType, - nullable, - comment, - /* defaultValue = */ null, - /* generationExpression = */ null, - /* identityColumnSpec = */ null, - metadataInJSON, - /* id = */ null); + return newBuilder(name, dataType) + .nullable(nullable) + .comment(comment) + .metadataInJSON(metadataInJSON) + .build(); } static Column create( @@ -72,26 +71,14 @@ static Column create( String comment, ColumnDefaultValue defaultValue, String metadataInJSON) { - return new ColumnImpl( - name, - dataType, - nullable, - comment, - defaultValue, - /* generationExpression = */ null, - /* identityColumnSpec = */ null, - metadataInJSON, - /* id = */ null); + return newBuilder(name, dataType) + .nullable(nullable) + .comment(comment) + .defaultValue(defaultValue) + .metadataInJSON(metadataInJSON) + .build(); } - /** - * Creates a column with a generation expression in SQL string form. - * - * @since 4.3.0 - * @deprecated Use - * {@link #create(String, DataType, boolean, String, GenerationExpression, String)} instead. - */ - @Deprecated static Column create( String name, DataType dataType, @@ -99,61 +86,39 @@ static Column create( String comment, String generationExpression, String metadataInJSON) { - GenerationExpression genExpr = generationExpression != null - ? new GenerationExpression(generationExpression) : null; - return new ColumnImpl( - name, - dataType, - nullable, - comment, - /* defaultValue = */ null, - genExpr, - /* identityColumnSpec = */ null, - metadataInJSON, - /* id = */ null); + return newBuilder(name, dataType) + .nullable(nullable) + .comment(comment) + .generationExpression(generationExpression) + .metadataInJSON(metadataInJSON) + .build(); } - /** - * Creates a column with a generation expression object. - * - * @since 4.3.0 - */ static Column create( String name, DataType dataType, boolean nullable, String comment, - GenerationExpression generationExpression, + IdentityColumnSpec identityColumnSpec, String metadataInJSON) { - return new ColumnImpl( - name, - dataType, - nullable, - comment, - /* defaultValue = */ null, - generationExpression, - /* identityColumnSpec = */ null, - metadataInJSON, - /* id = */ null); + return newBuilder(name, dataType) + .nullable(nullable) + .comment(comment) + .identityColumnSpec(identityColumnSpec) + .metadataInJSON(metadataInJSON) + .build(); } - static Column create( - String name, - DataType dataType, - boolean nullable, - String comment, - IdentityColumnSpec identityColumnSpec, - String metadataInJSON) { - return new ColumnImpl( - name, - dataType, - nullable, - comment, - /* defaultValue = */ null, - /* generationExpression = */ null, - identityColumnSpec, - metadataInJSON, - /* id = */ null); + /** + * Creates a builder for a column. + * + * @param name the name of the column + * @param dataType the data type of the column + * @return a new builder + * @since 4.2.0 + */ + static Builder newBuilder(String name, DataType dataType) { + return new Builder(name, dataType); } /** @@ -243,12 +208,97 @@ default GenerationExpression columnGenerationExpression() { * others. *

* This API covers top-level columns only. Nested struct fields, array elements, and map - * keys/values do not have separate IDs. Connectors that track nested field IDs can encode - * them into the returned top-level Column ID string to detect nested changes, since Spark - * only compares string equality. + * keys/values carry their own IDs in struct field metadata. Spark validates both top-level and + * nested field IDs as part of schema compatibility checks. See {@link StructField#id()}. */ @Nullable default String id() { return null; } + + /** + * A builder for {@link Column}. + * + * @since 4.2.0 + */ + class Builder { + private final String name; + private final DataType dataType; + private boolean nullable = true; + private String comment = null; + private ColumnDefaultValue defaultValue = null; + private GenerationExpression genExpr = null; + private IdentityColumnSpec identityColumnSpec = null; + private String metadataInJSON = null; + private String id = null; + + private Builder(String name, DataType dataType) { + this.name = Objects.requireNonNull(name, "name must not be null"); + this.dataType = Objects.requireNonNull(dataType, "dataType must not be null"); + } + + public Builder nullable(boolean nullable) { + this.nullable = nullable; + return this; + } + + public Builder comment(String comment) { + this.comment = comment; + return this; + } + + public Builder defaultValue(ColumnDefaultValue defaultValue) { + this.defaultValue = defaultValue; + return this; + } + + public Builder generationExpression(String sql) { + this.genExpr = sql != null ? new GenerationExpression(sql) : null; + return this; + } + + public Builder generationExpression(GenerationExpression generationExpr) { + this.genExpr = generationExpr; + return this; + } + + public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) { + this.identityColumnSpec = identityColumnSpec; + return this; + } + + public Builder metadataInJSON(String metadataInJSON) { + this.metadataInJSON = metadataInJSON; + return this; + } + + public Builder id(String id) { + this.id = id; + return this; + } + + public Column build() { + validateState(); + return new ColumnImpl( + name, dataType, nullable, comment, defaultValue, + genExpr, identityColumnSpec, metadataInJSON, id); + } + + private void validateState() { + if (hasConflictingDefinitions()) { + throw new SparkIllegalArgumentException( + "INTERNAL_ERROR", + Map.of("message", + "Column '" + name + "' cannot have more than one definition of: " + + "default value, generation expression, identity column spec")); + } + } + + private boolean hasConflictingDefinitions() { + long definitionCount = Stream.of(defaultValue, genExpr, identityColumnSpec) + .filter(Objects::nonNull) + .count(); + return definitionCount > 1; + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala index 223e7012af6b6..bfe1569962fec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala @@ -151,15 +151,6 @@ private[sql] object V2TableReferenceUtils extends SQLConfHelper { // Make sure the table was not dropped and recreated. ref.info.tableId.foreach(V2TableUtil.validateTableId(ref.name, _, table)) - // Detect columns that were dropped and re-added with the same name but a different - // column ID. This catches replacements that preserve the schema but change identity. - val colIdErrors = V2TableUtil.validateColumnIds( - table = table, - originalCapturedCols = ref.info.columns) - if (colIdErrors.nonEmpty) { - throw QueryCompilationErrors.columnIdMismatchAfterAnalysis(ref.name, colIdErrors) - } - // Do not allow schema evolution to pre-analysed dataframes that are later used in // transactional writes. This is because the entire plans was built based on the original schema // and any schema change would make the plan structurally invalid. This is inline with the @@ -187,7 +178,8 @@ private[sql] object V2TableReferenceUtils extends SQLConfHelper { val dataErrors = V2TableUtil.validateCapturedColumns( table, ref.info.columns, - mode = ALLOW_NEW_TOP_LEVEL_FIELDS) + mode = ALLOW_NEW_TOP_LEVEL_FIELDS, + checkFieldIds = false) if (dataErrors.nonEmpty) { throw QueryCompilationErrors.columnsChangedAfterViewWithPlanCreation( ctx.viewName, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala index 8f3cd274e93de..10a0fe21cb294 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{AnalysisAwareExpression, Expre import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.trees.TreePattern.{ANALYSIS_AWARE_EXPRESSION, TreePattern} import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, GeneratedColumn, IdentityColumn, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.validateDefaultValueExpr import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY} import org.apache.spark.sql.connector.catalog.{Column => V2Column, ColumnDefaultValue, DefaultValue, IdentityColumnSpec} @@ -31,6 +32,7 @@ import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.connector.ColumnImpl import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructField} +import org.apache.spark.sql.util.SchemaUtils /** * User-specified column definition for CREATE/REPLACE TABLE commands. This is an expression so that @@ -141,6 +143,7 @@ object ColumnDefinition { def fromV1Column(col: StructField, parser: ParserInterface): ColumnDefinition = { val metadataBuilder = new MetadataBuilder().withMetadata(col.metadata) metadataBuilder.remove("comment") + metadataBuilder.remove(FIELD_ID_METADATA_KEY) metadataBuilder.remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY) metadataBuilder.remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY) metadataBuilder.remove(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY) @@ -173,7 +176,7 @@ object ColumnDefinition { } ColumnDefinition( col.name, - col.dataType, + SchemaUtils.removeFieldIds(col.dataType), col.nullable, col.getComment(), defaultValue, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e34a21eeec762..2533dab1796d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.ClusterBySpec import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, V2ExpressionUtils} import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn} +import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.constraints.Constraint @@ -38,7 +39,7 @@ import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralVa import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} import org.apache.spark.util.ArrayImplicits._ private[sql] object CatalogV2Util { @@ -78,6 +79,24 @@ private[sql] object CatalogV2Util { SupportsNamespaces.PROP_LOCATION, SupportsNamespaces.PROP_OWNER) + val REMOVED_DEFAULT_VALUE_METADATA_KEYS: Seq[String] = Seq( + "comment", + FIELD_ID_METADATA_KEY, + CURRENT_DEFAULT_COLUMN_METADATA_KEY, + EXISTS_DEFAULT_COLUMN_METADATA_KEY) + + val REMOVED_GENERATED_COL_METADATA_KEYS: Seq[String] = Seq( + "comment", + FIELD_ID_METADATA_KEY, + GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY) + + val REMOVED_IDENTITY_COL_METADATA_KEYS: Seq[String] = Seq( + "comment", + FIELD_ID_METADATA_KEY, + IdentityColumn.IDENTITY_INFO_START, + IdentityColumn.IDENTITY_INFO_STEP, + IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT) + /** * Apply properties changes to a map and return the result. */ @@ -636,9 +655,9 @@ private[sql] object CatalogV2Util { } /** - * Converts DS v2 columns to StructType, which encodes column comment and default value to - * StructField metadata. This is mainly used to define the schema of v2 scan, w.r.t. the columns - * of the v2 table. + * Converts DS v2 columns to StructType, which encodes column comment, default value, and + * column ID to StructField metadata. This is mainly used to define the schema of v2 scan, + * w.r.t. the columns of the v2 table. */ def v2ColumnsToStructType(columns: Seq[Column]): StructType = { StructType(columns.map(v2ColumnToStructField)) @@ -653,6 +672,9 @@ private[sql] object CatalogV2Util { Option(col.defaultValue()).foreach { default => f = encodeDefaultValue(default, f) } + Option(col.id()).foreach { id => + f = f.withId(id) + } f } @@ -699,15 +721,16 @@ private[sql] object CatalogV2Util { /** * Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column - * comment and default value or generation expression. This is mainly used to generate DS v2 - * columns from table schema in DDL commands, so that Spark can pass DS v2 columns to DS v2 - * createTable and related APIs. + * comment, default value or generation expression, and column ID. This is mainly used to + * generate DS v2 columns from table schema in DDL commands, so that Spark can pass DS v2 + * columns to DS v2 createTable and related APIs. */ - def structTypeToV2Columns(schema: StructType): Array[Column] = { - schema.fields.map(structFieldToV2Column) - } + def structTypeToV2Columns( + schema: StructType, + keepFieldIds: Boolean = true): Array[Column] = + schema.fields.map(structFieldToV2Column(_, keepFieldIds)) - private def structFieldToV2Column(f: StructField): Column = { + private def structFieldToV2Column(f: StructField, keepFieldIds: Boolean): Column = { def metadataAsJson(metadata: Metadata): String = { if (metadata == Metadata.empty) { null @@ -721,6 +744,10 @@ private[sql] object CatalogV2Util { }.build() } + val id = if (keepFieldIds) f.id.orNull else null + val dataType = if (keepFieldIds) f.dataType else SchemaUtils.removeFieldIds(f.dataType) + val comment = f.getComment().orNull + val isDefaultColumn = f.getCurrentDefaultValue().isDefined val isGeneratedColumn = GeneratedColumn.isGeneratedColumn(f) val isIdentityColumn = IdentityColumn.isIdentityColumn(f) @@ -736,33 +763,45 @@ private[sql] object CatalogV2Util { assert(e.resolved && e.foldable, "The existence default value must be a simple SQL string that is resolved and " + "foldable, but got: " + f.getExistenceDefaultValue().get) - LiteralValue(e.eval(), f.dataType) + LiteralValue(e.eval(), dataType) } else { null } val defaultValue = new ColumnDefaultValue(f.getCurrentDefaultValue().get, existsDefault) - val cleanedMetadata = metadataWithKeysRemoved( - Seq("comment", CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY)) - Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, - metadataAsJson(cleanedMetadata)) + val cleanedMetadata = metadataWithKeysRemoved(REMOVED_DEFAULT_VALUE_METADATA_KEYS) + Column.newBuilder(f.name, dataType) + .nullable(f.nullable) + .comment(comment) + .defaultValue(defaultValue) + .metadataInJSON(metadataAsJson(cleanedMetadata)) + .id(id) + .build() } else if (isGeneratedColumn) { - val cleanedMetadata = metadataWithKeysRemoved( - Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY)) - Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, - new GenerationExpression(GeneratedColumn.getGenerationExpression(f).get), - metadataAsJson(cleanedMetadata)) + val cleanedMetadata = metadataWithKeysRemoved(REMOVED_GENERATED_COL_METADATA_KEYS) + Column.newBuilder(f.name, dataType) + .nullable(f.nullable) + .comment(comment) + .generationExpression(GeneratedColumn.getGenerationExpression(f).get) + .metadataInJSON(metadataAsJson(cleanedMetadata)) + .id(id) + .build() } else if (isIdentityColumn) { - val cleanedMetadata = metadataWithKeysRemoved( - Seq("comment", - IdentityColumn.IDENTITY_INFO_START, - IdentityColumn.IDENTITY_INFO_STEP, - IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT)) - Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, - IdentityColumn.getIdentityInfo(f).get, metadataAsJson(cleanedMetadata)) + val cleanedMetadata = metadataWithKeysRemoved(REMOVED_IDENTITY_COL_METADATA_KEYS) + Column.newBuilder(f.name, dataType) + .nullable(f.nullable) + .comment(comment) + .identityColumnSpec(IdentityColumn.getIdentityInfo(f).get) + .metadataInJSON(metadataAsJson(cleanedMetadata)) + .id(id) + .build() } else { - val cleanedMetadata = metadataWithKeysRemoved(Seq("comment")) - Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, - metadataAsJson(cleanedMetadata)) + val cleanedMetadata = metadataWithKeysRemoved(Seq("comment", FIELD_ID_METADATA_KEY)) + Column.newBuilder(f.name, dataType) + .nullable(f.nullable) + .comment(comment) + .metadataInJSON(metadataAsJson(cleanedMetadata)) + .id(id) + .build() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala index 348d7e96e7d46..6ec3d35183e4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector.catalog import java.util.Locale -import scala.collection.mutable - import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, MetadataColumnHelper} @@ -69,10 +67,11 @@ private[sql] object V2TableUtil extends SQLConfHelper { def validateCapturedColumns( table: Table, originCols: Seq[Column], - mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = { + mode: SchemaValidationMode = PROHIBIT_CHANGES, + checkFieldIds: Boolean = true): Seq[String] = { val originSchema = CatalogV2Util.v2ColumnsToStructType(originCols) val schema = CatalogV2Util.v2ColumnsToStructType(table.columns) - SchemaUtils.validateSchemaCompatibility(originSchema, schema, resolver, mode) + SchemaUtils.validateSchemaCompatibility(originSchema, schema, resolver, mode, checkFieldIds) } /** @@ -124,66 +123,6 @@ private[sql] object V2TableUtil extends SQLConfHelper { SchemaUtils.validateSchemaCompatibility(originMetaSchema, metaSchema, resolver, mode) } - /** - * Validates that column IDs have not changed for columns that still exist in the table. - * - * Only validates columns where the original and current column both have non-null IDs. - * If the connector does not support column IDs (returns null), the validation is skipped. - * - * @param table the current table metadata - * @param relation the relation with captured columns - * @return validation errors, or empty sequence if valid - */ - def validateColumnIds( - table: Table, - relation: DataSourceV2Relation): Seq[String] = { - validateColumnIds( - table = table, - originalCapturedCols = relation.table.columns.toImmutableArraySeq) - } - - /** - * Validates that column IDs have not changed for columns that still exist in the table. - * - * Only validates columns where the original and current column both have non-null IDs. - * If the connector does not support column IDs (returns null), the validation is skipped. - * - * ID transition handling: - * - null to null: skipped (no ID to validate) - * - null to ID: skipped (connector enabled ID tracking after analysis) - * - ID to null: skipped (connector disabled ID tracking) - * - ID to ID (same): no error - * - ID to ID (different): error, same column name was replaced - * - * @param table the current table metadata - * @param originalCapturedCols the originally captured columns - * @return validation errors, or empty sequence if valid - */ - def validateColumnIds( - table: Table, - originalCapturedCols: Seq[Column]): Seq[String] = { - val currentColsByNormalizedName = table.columns.toImmutableArraySeq - .map(currentCol => normalize(currentCol.name()) -> currentCol).toMap - val errors = new mutable.ArrayBuffer[String]() - for (originalCol <- originalCapturedCols) { - if (originalCol.id() != null) { - currentColsByNormalizedName.get(normalize(originalCol.name())) match { - case Some(currentCol) - if currentCol.id() != null && currentCol.id() != originalCol.id() => - errors += s"`${originalCol.name()}` column ID has changed from " + - s"${originalCol.id()} to ${currentCol.id()}" - case _ => - // 1. Column exists in the original schema but not in the current table. - // 2. Column IDs have not changed. - // 3. The current column's ID is null (connector disabled ID tracking). - // Note that dropped columns are handled separately by - // [[columnsMissingOrAddedAfterAnalysis]]. - } - } - } - errors.toSeq - } - private def filter(colNames: Seq[String], cols: Seq[MetadataColumn]): Seq[MetadataColumn] = { val normalizedColNames = colNames.map(normalize).toSet cols.filter(col => normalizedColNames.contains(normalize(col.name))) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 7f76f48cc7eff..1cb2fd97edab1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2315,16 +2315,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "currentTableId" -> currentTableId)) } - def columnIdMismatchAfterAnalysis( - tableName: String, - errors: Seq[String]): Throwable = { - new AnalysisException( - errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", - messageParameters = Map( - "tableName" -> toSQLId(tableName), - "errors" -> errors.mkString("- ", "\n- ", ""))) - } - def columnsMissingOrAddedAfterAnalysis( tableName: String, errors: Seq[String]): Throwable = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala index 2e5cd4b8b0a16..a8c98dda29f62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.internal.connector -import java.util.Objects - import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, GenerationExpression, IdentityColumnSpec} import org.apache.spark.sql.types.DataType @@ -32,36 +30,4 @@ case class ColumnImpl( override val columnGenerationExpression: GenerationExpression, identityColumnSpec: IdentityColumnSpec, metadataInJSON: String, - override val id: String = null) extends Column { - - // [[id]] is excluded from [[equals]] and [[hashCode]] because IDs only live on [[Column]], - // not on [[StructField]] metadata. Any code path that round-trips through [[StructType]] - // (e.g. [[CatalogV2Util.v2ColumnsToStructType]] followed by [[structTypeToV2Columns]]) - // drops the ID, producing a [[Column]] with id=null for the same logical column. Including - // [[id]] in equality would cause spurious mismatches across these round-trips. - // Column ID validation is performed separately by [[V2TableUtil.validateColumnIds]]. - override def equals(other: Any): Boolean = other match { - case that: ColumnImpl => - name == that.name && - dataType == that.dataType && - nullable == that.nullable && - comment == that.comment && - defaultValue == that.defaultValue && - columnGenerationExpression == that.columnGenerationExpression && - identityColumnSpec == that.identityColumnSpec && - metadataInJSON == that.metadataInJSON - case _ => false - } - - override def hashCode(): Int = { - Objects.hash( - name, - dataType, - Boolean.box(nullable), - comment, - defaultValue, - columnGenerationExpression, - identityColumnSpec, - metadataInJSON) - } -} + override val id: String = null) extends Column diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 58ababa04739f..8ae2d2d3043ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -406,7 +406,8 @@ private[spark] object SchemaUtils { schema: StructType, otherSchema: StructType, resolver: Resolver, - mode: SchemaValidationMode): Seq[String] = { + mode: SchemaValidationMode, + checkFieldIds: Boolean = true): Seq[String] = { checkSchemaColumnNameDuplication(schema, resolver) checkSchemaColumnNameDuplication(otherSchema, resolver) val errors = mutable.ArrayBuffer[String]() @@ -418,6 +419,7 @@ private[spark] object SchemaUtils { colPath = Seq.empty, resolver, mode, + checkFieldIds, errors) errors.toSeq } @@ -430,6 +432,7 @@ private[spark] object SchemaUtils { colPath: Seq[String], resolver: Resolver, mode: SchemaValidationMode, + checkFieldIds: Boolean, errors: mutable.ArrayBuffer[String]): Unit = { if (nullable && !otherNullable) { errors += s"${colPath.fullyQuoted} is no longer nullable" @@ -445,6 +448,11 @@ private[spark] object SchemaUtils { fieldsByName.foreach { case (normalizedName, field) => otherFieldsByName.get(normalizedName) match { case Some(otherField) => + if (checkFieldIds) { + for (id <- field.id; otherId <- otherField.id if id != otherId) { + errors += s"${colPath.fullyQuoted} field ID has changed from $id to $otherId" + } + } validateTypeCompatibility( field.dataType, otherField.dataType, @@ -453,6 +461,7 @@ private[spark] object SchemaUtils { colPath :+ field.name, resolver, mode, + checkFieldIds, errors) case None => errors += s"${formatField(colPath, field)} has been removed" @@ -476,6 +485,7 @@ private[spark] object SchemaUtils { colPath :+ "element", resolver, mode, + checkFieldIds, errors) case (MapType(keyType, valueType, valueContainsNull), @@ -488,6 +498,7 @@ private[spark] object SchemaUtils { colPath :+ "key", resolver, mode, + checkFieldIds, errors) validateTypeCompatibility( valueType, @@ -497,6 +508,7 @@ private[spark] object SchemaUtils { colPath :+ "value", resolver, mode, + checkFieldIds, errors) case _ if dataType != otherDataType => @@ -522,6 +534,39 @@ private[spark] object SchemaUtils { fields.map(field => field.name.toLowerCase(Locale.ROOT) -> field).toMap } } + + /** + * Recursively removes field IDs from a data type. Only allocates new objects when a field ID + * is actually present, returning the original instance unchanged when there is nothing to strip. + */ + def removeFieldIds(dataType: DataType): DataType = dataType match { + case s: StructType => + StructType(s.fields.map { field => + val fieldWithoutId = field.clearId() + val newDataType = removeFieldIds(field.dataType) + if (newDataType ne field.dataType) { + fieldWithoutId.copy(dataType = newDataType) + } else { + fieldWithoutId + } + }) + + case a: ArrayType => + val newElementType = removeFieldIds(a.elementType) + if (newElementType ne a.elementType) a.copy(elementType = newElementType) else a + + case m: MapType => + val newKeyType = removeFieldIds(m.keyType) + val newValueType = removeFieldIds(m.valueType) + if ((newKeyType ne m.keyType) || (newValueType ne m.valueType)) { + m.copy(keyType = newKeyType, valueType = newValueType) + } else { + m + } + + case other => + other + } } private[spark] sealed trait SchemaValidationMode diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index ad9c2655023fc..0d5bc6364e1bc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -32,8 +32,9 @@ import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, LogicalExpressions, Transform} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.connector.ColumnImpl import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} class CatalogSuite extends SparkFunSuite { import CatalogV2Implicits._ @@ -50,6 +51,13 @@ class CatalogSuite extends SparkFunSuite { newCatalog } + private def stripIds(cols: Array[Column]): Array[Column] = { + cols.map { col => + val impl = col.asInstanceOf[ColumnImpl] + impl.copy(id = null, dataType = SchemaUtils.removeFieldIds(impl.dataType)) + } + } + private val testNs = Array("`", ".") private val testIdent = Identifier.of(testNs, "test_table") private val testIdentQuoted = testIdent.asMultipartIdentifier @@ -151,7 +159,7 @@ class CatalogSuite extends SparkFunSuite { val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) assert(table.properties.asScala == Map()) assert(catalog.tableExists(testIdent)) @@ -175,7 +183,7 @@ class CatalogSuite extends SparkFunSuite { val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) assert(table.properties.asScala == Map()) assert(partCatalog.tableExists(testIdent)) @@ -198,7 +206,7 @@ class CatalogSuite extends SparkFunSuite { val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) assert(table.properties == properties) assert(catalog.tableExists(testIdent)) @@ -220,7 +228,7 @@ class CatalogSuite extends SparkFunSuite { val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) assert(table.constraints === constraints) assert(table.properties.asScala == Map()) @@ -419,11 +427,11 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) - assert(updated.columns === columns :+ Column.create("ts", TimestampType)) + assert(stripIds(updated.columns) === columns :+ Column.create("ts", TimestampType)) } test("alterTable: add required column") { @@ -436,12 +444,12 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false)) - assert(updated.columns === columns :+ Column.create("ts", TimestampType, false)) + assert(stripIds(updated.columns) === columns :+ Column.create("ts", TimestampType, false)) } test("alterTable: add column with comment") { @@ -454,13 +462,13 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false, "comment text")) val tsColumn = Column.create("ts", TimestampType, false, "comment text", null) - assert(updated.columns === (columns :+ tsColumn)) + assert(stripIds(updated.columns) === (columns :+ tsColumn)) } test("alterTable: add nested column") { @@ -476,14 +484,14 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === tableColumns) + assert(stripIds(table.columns) === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("point", "z"), DoubleType)) val expectedColumns = columns :+ Column.create("point", pointStruct.add("z", DoubleType)) - assert(updated.columns === expectedColumns) + assert(stripIds(updated.columns) === expectedColumns) } test("alterTable: add column to primitive field fails") { @@ -496,7 +504,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -506,7 +514,7 @@ class CatalogSuite extends SparkFunSuite { parameters = Map("name" -> "data")) // the table has not changed - assert(catalog.loadTable(testIdent).columns === columns) + assert(stripIds(catalog.loadTable(testIdent).columns) === columns) } test("alterTable: add field to missing column fails") { @@ -519,7 +527,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -540,12 +548,12 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) val expectedColumns = Array(Column.create("id", LongType), Column.create("data", StringType)) - assert(updated.columns sameElements expectedColumns) + assert(stripIds(updated.columns) sameElements expectedColumns) } test("alterTable: update column nullability") { @@ -561,14 +569,14 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === originalColumns) + assert(stripIds(table.columns) === originalColumns) val updated = catalog.alterTable(testIdent, TableChange.updateColumnNullability(Array("id"), true)) val expectedColumns = Array( Column.create("id", IntegerType, true), Column.create("data", StringType)) - assert(updated.columns sameElements expectedColumns) + assert(stripIds(updated.columns) sameElements expectedColumns) } test("alterTable: update missing column fails") { @@ -581,7 +589,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -602,7 +610,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) val updated = catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) @@ -611,7 +619,7 @@ class CatalogSuite extends SparkFunSuite { Column.create("id", IntegerType, true, "comment text", null), Column.create("data", StringType) ) - assert(updated.columns sameElements expectedColumns) + assert(stripIds(updated.columns) sameElements expectedColumns) } test("alterTable: replace comment") { @@ -624,7 +632,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) @@ -635,7 +643,7 @@ class CatalogSuite extends SparkFunSuite { val updated = catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "replacement comment")) - assert(updated.columns sameElements expectedColumns) + assert(stripIds(updated.columns) sameElements expectedColumns) } test("alterTable: add comment to missing column fails") { @@ -648,7 +656,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -669,13 +677,13 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) val expectedColumns = Array( Column.create("some_id", IntegerType), Column.create("data", StringType)) - assert(updated.columns sameElements expectedColumns) + assert(stripIds(updated.columns) sameElements expectedColumns) } test("alterTable: rename nested column") { @@ -691,7 +699,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === tableColumns) + assert(stripIds(table.columns) === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first")) @@ -699,7 +707,7 @@ class CatalogSuite extends SparkFunSuite { val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType) val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.columns === expectedColumns) + assert(stripIds(updated.columns) === expectedColumns) } test("alterTable: rename struct column") { @@ -715,7 +723,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === tableColumns) + assert(stripIds(table.columns) === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p")) @@ -723,7 +731,7 @@ class CatalogSuite extends SparkFunSuite { val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val expectedColumns = columns :+ Column.create("p", newPointStruct) - assert(updated.columns === expectedColumns) + assert(stripIds(updated.columns) === expectedColumns) } test("alterTable: rename missing column fails") { @@ -736,7 +744,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -760,7 +768,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === tableColumns) + assert(stripIds(table.columns) === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first"), @@ -769,7 +777,7 @@ class CatalogSuite extends SparkFunSuite { val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType) val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.columns() === expectedColumns) + assert(stripIds(updated.columns()) === expectedColumns) } test("alterTable: delete top-level column") { @@ -782,13 +790,13 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) val updated = catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) val expectedColumns = Array(Column.create("data", StringType)) - assert(updated.columns sameElements expectedColumns) + assert(stripIds(updated.columns) sameElements expectedColumns) } test("alterTable: delete nested column") { @@ -804,7 +812,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === tableColumns) + assert(stripIds(table.columns) === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "y"), false)) @@ -812,7 +820,7 @@ class CatalogSuite extends SparkFunSuite { val newPointStruct = new StructType().add("x", DoubleType) val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.columns === expectedColumns) + assert(stripIds(updated.columns) === expectedColumns) } test("alterTable: delete missing column fails") { @@ -825,7 +833,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -836,7 +844,7 @@ class CatalogSuite extends SparkFunSuite { // with if exists it should pass catalog.alterTable(testIdent, TableChange.deleteColumn(Array("missing_col"), true)) - assert(table.columns === columns) + assert(stripIds(table.columns) === columns) } test("alterTable: delete missing nested column fails") { @@ -852,7 +860,7 @@ class CatalogSuite extends SparkFunSuite { .build() val table = catalog.createTable(testIdent, tableInfo) - assert(table.columns === tableColumns) + assert(stripIds(table.columns) === tableColumns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -863,7 +871,7 @@ class CatalogSuite extends SparkFunSuite { // with if exists it should pass catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "z"), true)) - assert(table.columns === tableColumns) + assert(stripIds(table.columns) === tableColumns) } test("alterTable: table does not exist") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala new file mode 100644 index 0000000000000..62083c4205fbf --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import org.apache.spark.SparkFunSuite +import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.ColumnDefinition +import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY +import org.apache.spark.sql.connector.expressions.LiteralValue +import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType} + +class ColumnSuite extends SparkFunSuite { + + private val intLiteral = LiteralValue(42, IntegerType) + private val defaultValue = new ColumnDefaultValue("42", intLiteral) + private val identitySpec = new IdentityColumnSpec(1L, 1L, false) + + // --------------------------------------------------------------------------- + // Column.create factory overloads + // --------------------------------------------------------------------------- + + test("create(name, type) defaults: nullable, no comment, no generation expr, no metadata") { + val col = Column.create("c", IntegerType) + assert(col.name() == "c") + assert(col.dataType() == IntegerType) + assert(col.nullable()) + assert(col.comment() == null) + assert(col.defaultValue() == null) + assert(col.generationExpression() == null) + assert(col.identityColumnSpec() == null) + assert(col.metadataInJSON() == null) + assert(col.id() == null) + } + + test("create(name, type, nullable) controls nullable flag") { + assert(Column.create("c", IntegerType, false).nullable() == false) + assert(Column.create("c", IntegerType, true).nullable() == true) + } + + test("create(name, type, nullable, comment, metadataInJSON)") { + val col = Column.create("c", StringType, false, "a comment", """{"key":"val"}""") + assert(col.name() == "c") + assert(col.dataType() == StringType) + assert(col.nullable() == false) + assert(col.comment() == "a comment") + assert(col.metadataInJSON() == """{"key":"val"}""") + assert(col.defaultValue() == null) + assert(col.generationExpression() == null) + assert(col.identityColumnSpec() == null) + } + + test("create(name, type, nullable, comment, defaultValue, metadataInJSON)") { + val col = Column.create("c", IntegerType, true, "doc", defaultValue, null) + assert(col.defaultValue() == defaultValue) + assert(col.generationExpression() == null) + assert(col.identityColumnSpec() == null) + } + + test("create(name, type, nullable, comment, generationExpression, metadataInJSON)") { + val col = Column.create("c", IntegerType, true, null, "a + 1", null) + assert(col.generationExpression() == "a + 1") + assert(col.defaultValue() == null) + assert(col.identityColumnSpec() == null) + } + + test("create(name, type, nullable, comment, identityColumnSpec, metadataInJSON)") { + val col = Column.create("c", IntegerType, false, null, identitySpec, null) + assert(col.identityColumnSpec() == identitySpec) + assert(col.defaultValue() == null) + assert(col.generationExpression() == null) + } + + // --------------------------------------------------------------------------- + // Column.newBuilder / Builder + // --------------------------------------------------------------------------- + + test("builder defaults: nullable=true, everything else null") { + val col = Column.newBuilder("c", IntegerType).build() + assert(col.name() == "c") + assert(col.dataType() == IntegerType) + assert(col.nullable()) + assert(col.comment() == null) + assert(col.defaultValue() == null) + assert(col.generationExpression() == null) + assert(col.identityColumnSpec() == null) + assert(col.metadataInJSON() == null) + assert(col.id() == null) + } + + test("builder sets all fields") { + val col = Column.newBuilder("c", IntegerType) + .nullable(false) + .comment("doc") + .defaultValue(defaultValue) + .metadataInJSON("""{"k":"v"}""") + .id("abc-123") + .build() + assert(col.name() == "c") + assert(col.dataType() == IntegerType) + assert(col.nullable() == false) + assert(col.comment() == "doc") + assert(col.defaultValue() == defaultValue) + assert(col.metadataInJSON() == """{"k":"v"}""") + assert(col.id() == "abc-123") + } + + test("builder with generationExpression") { + val col = Column.newBuilder("c", IntegerType) + .generationExpression("a * 2") + .build() + assert(col.generationExpression() == "a * 2") + assert(col.defaultValue() == null) + assert(col.identityColumnSpec() == null) + } + + test("builder with identityColumnSpec") { + val col = Column.newBuilder("c", IntegerType) + .identityColumnSpec(identitySpec) + .build() + assert(col.identityColumnSpec() == identitySpec) + assert(col.defaultValue() == null) + assert(col.generationExpression() == null) + } + + // --------------------------------------------------------------------------- + // Builder invariants: conflicting definitions are rejected + // --------------------------------------------------------------------------- + + test("builder rejects defaultValue + generationExpression") { + val ex = intercept[SparkIllegalArgumentException] { + Column.newBuilder("c", IntegerType) + .defaultValue(defaultValue) + .generationExpression("a + 1") + .build() + } + assert(ex.getMessage.contains("cannot have more than one definition")) + } + + test("builder rejects generationExpression + identityColumnSpec") { + val ex = intercept[SparkIllegalArgumentException] { + Column.newBuilder("c", IntegerType) + .generationExpression("a + 1") + .identityColumnSpec(identitySpec) + .build() + } + assert(ex.getMessage.contains("cannot have more than one definition")) + } + + test("builder rejects defaultValue + identityColumnSpec") { + val ex = intercept[SparkIllegalArgumentException] { + Column.newBuilder("c", IntegerType) + .defaultValue(defaultValue) + .identityColumnSpec(identitySpec) + .build() + } + assert(ex.getMessage.contains("cannot have more than one definition")) + } + + test("builder rejects all three definitions set simultaneously") { + val ex = intercept[SparkIllegalArgumentException] { + Column.newBuilder("c", IntegerType) + .defaultValue(defaultValue) + .generationExpression("a + 1") + .identityColumnSpec(identitySpec) + .build() + } + assert(ex.getMessage.contains("cannot have more than one definition")) + } + + test("builder error message names the column") { + val ex = intercept[SparkIllegalArgumentException] { + Column.newBuilder("my_column", IntegerType) + .defaultValue(defaultValue) + .generationExpression("x") + .build() + } + assert(ex.getMessage.contains("my_column")) + } + + // --------------------------------------------------------------------------- + // newBuilder rejects null name / type + // --------------------------------------------------------------------------- + + test("newBuilder rejects null name") { + intercept[NullPointerException] { + Column.newBuilder(null, IntegerType) + } + } + + test("newBuilder rejects null dataType") { + intercept[NullPointerException] { + Column.newBuilder("c", null) + } + } + + // --------------------------------------------------------------------------- + // ColumnDefinition.fromV1Column - metadata cleaning + // --------------------------------------------------------------------------- + + test("fromV1Column strips FIELD_ID_METADATA_KEY from metadata") { + val metadata = new MetadataBuilder() + .putString(FIELD_ID_METADATA_KEY, "42") + .putString("custom_key", "custom_value") + .build() + val field = StructField("col", IntegerType, nullable = true, metadata) + val colDef = ColumnDefinition.fromV1Column(field, CatalystSqlParser) + assert(!colDef.metadata.contains(FIELD_ID_METADATA_KEY)) + assert(colDef.metadata.contains("custom_key")) + } + + test("fromV1Column strips nested field IDs from struct dataType") { + val nestedType = StructType(Array( + StructField("x", IntegerType).withId("source-id"), + StructField("y", IntegerType).withId("source-id"))) + val field = StructField("col", nestedType) + val colDef = ColumnDefinition.fromV1Column(field, CatalystSqlParser) + val resultType = colDef.dataType.asInstanceOf[StructType] + resultType.fields.foreach { f => assert(f.id.isEmpty) } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 8bfcfc020fa12..67797c83715bc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.connector.catalog import java.time.{Instant, ZoneId} import java.time.temporal.ChronoUnit import java.util -import java.util.Locale import java.util.Objects import java.util.OptionalLong import java.util.concurrent.atomic.AtomicLong @@ -75,15 +74,9 @@ abstract class InMemoryBaseTable( // Stores the table version validated during the last `ALTER TABLE ... ADD CONSTRAINT` operation. private var validatedTableVersion: String = null - // Assign column IDs to columns that do not have one. - // This simulates connectors that support column identity tracking. - private var tableColumns: Array[Column] = initialColumns.map { c => - if (c.id() == null) { - c.asInstanceOf[ColumnImpl].copy(id = InMemoryBaseTable.nextColumnId().toString) - } else { - c - } - } + // Assign column IDs to columns that do not have one, including nested struct fields within + // arrays and maps. This simulates connectors that support column identity tracking. + private var tableColumns: Array[Column] = InMemoryBaseTable.assignMissingIds(initialColumns) override def columns(): Array[Column] = tableColumns @@ -780,10 +773,8 @@ abstract class InMemoryBaseTable( val mergedSchema = mergeSchema( oldType = CatalogV2Util.v2ColumnsToStructType(columns()), newType = newSchema) - val newColumns = CatalogV2Util.structTypeToV2Columns(mergedSchema) tableColumns = InMemoryBaseTable.assignMissingIds( - oldColumns = columns(), - newColumns = newColumns) + CatalogV2Util.structTypeToV2Columns(mergedSchema)) writer } @@ -916,32 +907,60 @@ abstract class InMemoryBaseTable( object InMemoryBaseTable { private val columnIdGlobalCounter = new AtomicLong(0) def nextColumnId(): Long = columnIdGlobalCounter.incrementAndGet() - - private def normalize(name: String): String = name.toLowerCase(Locale.ROOT) + def nextColumnIdString(): String = nextColumnId().toString /** - * Preserves column IDs from `oldColumns` when the column name matches, - * and assigns new IDs to columns that do not already have one. + * Assigns fresh IDs to any top-level column or nested struct field that does not already + * have one. Recurses into struct fields within ArrayType and MapType so that every field + * at every depth gets an ID. * - * IDs are preserved across type changes, keeping the same column ID through type - * widening and nested field additions. [[TypeChangeResetsColIdTableCatalog]] overrides - * this behavior for testing scenarios where type changes should produce a new ID. + * Existing IDs are preserved: Column → StructType → Column round-trip encodes them in + * StructField metadata (see StructField.FIELD_ID_METADATA_KEY), so only genuinely new fields + * arrive here without an ID. */ - def assignMissingIds( - oldColumns: Array[Column], - newColumns: Array[Column]): Array[Column] = { - newColumns.map { newCol => - oldColumns.find(c => normalize(c.name()) == normalize(newCol.name())) match { - case Some(oldCol) if oldCol.id() != null => - newCol.asInstanceOf[ColumnImpl].copy(id = oldCol.id()) - case _ if newCol.id() == null => - newCol.asInstanceOf[ColumnImpl].copy(id = nextColumnId().toString) - case _ => - newCol + def assignMissingIds(columns: Array[Column]): Array[Column] = { + columns.map { col => + val impl = col.asInstanceOf[ColumnImpl] + val colWithId = if (col.id == null) impl.copy(id = nextColumnIdString()) else impl + val updatedType = assignFieldIds(colWithId.dataType) + if (updatedType ne colWithId.dataType) { + colWithId.copy(dataType = updatedType) + } else { + colWithId } } } + private def assignFieldIds(dataType: DataType): DataType = dataType match { + case s: StructType => + val newFields = s.fields.map { field => + val fieldWithId = if (field.id.isEmpty) field.withId(nextColumnIdString()) else field + val updatedType = assignFieldIds(fieldWithId.dataType) + if (updatedType ne fieldWithId.dataType) { + fieldWithId.copy(dataType = updatedType) + } else { + fieldWithId + } + } + if (newFields.zip(s.fields).forall { case (n, e) => n eq e }) s else StructType(newFields) + + case a: ArrayType => + val updatedElement = assignFieldIds(a.elementType) + if (updatedElement ne a.elementType) a.copy(elementType = updatedElement) else a + + case m: MapType => + val updatedKeyType = assignFieldIds(m.keyType) + val updatedValueType = assignFieldIds(m.valueType) + if ((updatedKeyType ne m.keyType) || (updatedValueType ne m.valueType)) { + m.copy(keyType = updatedKeyType, valueType = updatedValueType) } + else { + m + } + + case other => + other + } + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" def extractValue( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala index affc4e03b4042..a29f1ed29ee8f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala @@ -89,8 +89,7 @@ class InMemoryRowLevelOperationTableCatalog } val columnsWithIds = InMemoryBaseTable.assignMissingIds( - oldColumns = table.columns(), - newColumns = CatalogV2Util.structTypeToV2Columns(schema)) + CatalogV2Util.structTypeToV2Columns(schema)) val newTable = InMemoryRowLevelOperationTable.withColumns( name = table.name, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index c9a6c4acfa014..bb137ba4830df 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -207,8 +207,7 @@ class BasicInMemoryTableCatalog extends TableCatalog { table.increaseVersion() val currentVersion = table.version() val columnsWithIds = InMemoryBaseTable.assignMissingIds( - oldColumns = table.columns(), - newColumns = CatalogV2Util.structTypeToV2Columns(schema)) + CatalogV2Util.structTypeToV2Columns(schema)) val newTable = table match { case _: InMemoryTable => new InMemoryTable( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala index c26ce263c1f8b..fead2e6fba79d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullColumnIdInMemoryTableCatalog.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.internal.connector.ColumnImpl * override [[columns]] to strip IDs. Data is copied from the table * created by the parent [[InMemoryTableCatalog]]. * - * When column IDs are null, [[V2TableUtil.validateColumnIds]] + * When field IDs are null, field ID validation in [[org.apache.spark.sql.util.SchemaUtils]] * skips validation entirely, meaning drop/re-add of a column is NOT - * detected via column IDs. + * detected via field IDs. */ class NullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdInMemoryTableCatalog.scala index 391eb619535f5..ae079fac17141 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdInMemoryTableCatalog.scala @@ -29,7 +29,7 @@ package org.apache.spark.sql.connector.catalog * This is to test the scenario where connectors do not implement * table IDs but do implement column IDs. In this scenario, column * IDs assigned by [[InMemoryBaseTable]] still differ after recreate, - * so [[V2TableUtil.validateColumnIds]] catches the schema change. + * so field ID validation in [[org.apache.spark.sql.util.SchemaUtils]] catches the schema change. */ class NullTableIdInMemoryTableCatalog extends InMemoryTableCatalog { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala index d68f2e62b1365..83effcef2b266 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala @@ -23,9 +23,8 @@ import org.apache.spark.sql.internal.connector.ColumnImpl /** * An [[InMemoryTableCatalog]] that assigns fresh column IDs when the - * column's data type changes. This is the inverse of the default - * [[InMemoryBaseTable.assignMissingIds]] behavior, which preserves IDs - * across type changes. + * column's data type changes, overriding the default behavior where type + * changes preserve the existing column ID. * * Use this catalog for tests that need a type change to produce a new * column ID (e.g., verifying that adding a nested field to a container diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala index f4db1a94c4d58..11539657fbe36 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala @@ -550,6 +550,136 @@ class V2TableUtilSuite extends SparkFunSuite { assert(errors.head == "`person`.`attrs`.`value` type has changed from INT to BIGINT") } + // --------------------------------------------------------------------------- + // Field ID change error messages + // --------------------------------------------------------------------------- + + test("validateCapturedColumns - top-level column ID changed") { + val originCols = Array( + colWithId("id", LongType, nullable = false, id = "1"), + colWithId("name", StringType, nullable = true, id = "2")) + val currentCols = Array( + colWithId("id", LongType, nullable = false, id = "99"), // ID changed + colWithId("name", StringType, nullable = true, id = "2")) + val table = TestTableWithMetadataSupport("test", currentCols) + + val errors = validateCapturedColumns(table, originCols) + assert(errors.size == 1) + // colPath is Seq.empty at top level, so the column name does not appear in the message + assert(errors.head == " field ID has changed from 1 to 99") + } + + test("validateCapturedColumns - nested struct field ID changed") { + val originStruct = StructType(Seq( + StructField("name", StringType).withId("10"), + StructField("age", IntegerType).withId("11"))) + val originCols = Array(col("person", originStruct, nullable = true)) + + val currentStruct = StructType(Seq( + StructField("name", StringType).withId("10"), + StructField("age", IntegerType).withId("99"))) // age ID changed + val currentCols = Array(col("person", currentStruct, nullable = true)) + val table = TestTableWithMetadataSupport("test", currentCols) + + val errors = validateCapturedColumns(table, originCols) + assert(errors.size == 1) + // colPath at the check site is Seq("person") — the parent struct path + assert(errors.head == "`person` field ID has changed from 11 to 99") + } + + test("validateCapturedColumns - doubly-nested struct field ID changed") { + val originInner = StructType(Seq(StructField("age", IntegerType).withId("11"))) + val originOuter = StructType(Seq(StructField("info", originInner).withId("10"))) + val originCols = Array(col("person", originOuter, nullable = true)) + + val currentInner = StructType(Seq(StructField("age", IntegerType).withId("99"))) // changed + val currentOuter = StructType(Seq(StructField("info", currentInner).withId("10"))) + val currentCols = Array(col("person", currentOuter, nullable = true)) + val table = TestTableWithMetadataSupport("test", currentCols) + + val errors = validateCapturedColumns(table, originCols) + assert(errors.size == 1) + assert(errors.head == "`person`.`info` field ID has changed from 11 to 99") + } + + test("validateCapturedColumns - multiple nested struct field IDs changed") { + val originStruct = StructType(Seq( + StructField("name", StringType).withId("10"), + StructField("age", IntegerType).withId("11"), + StructField("score", DoubleType).withId("12"))) + val originCols = Array(col("person", originStruct, nullable = true)) + + val currentStruct = StructType(Seq( + StructField("name", StringType).withId("10"), + StructField("age", IntegerType).withId("98"), // changed + StructField("score", DoubleType).withId("99"))) // changed + val currentCols = Array(col("person", currentStruct, nullable = true)) + val table = TestTableWithMetadataSupport("test", currentCols) + + val errors = validateCapturedColumns(table, originCols) + assert(errors.size == 2) + assert(errors.exists(_ == "`person` field ID has changed from 11 to 98")) + assert(errors.exists(_ == "`person` field ID has changed from 12 to 99")) + } + + test("validateCapturedColumns - nested field ID changed in array element struct") { + val originElem = StructType(Seq( + StructField("name", StringType).withId("10"), + StructField("price", IntegerType).withId("11"))) + val originCols = Array(col("items", ArrayType(originElem), nullable = true)) + + val currentElem = StructType(Seq( + StructField("name", StringType).withId("10"), + StructField("price", IntegerType).withId("99"))) // price ID changed + val currentCols = Array(col("items", ArrayType(currentElem), nullable = true)) + val table = TestTableWithMetadataSupport("test", currentCols) + + val errors = validateCapturedColumns(table, originCols) + assert(errors.size == 1) + assert(errors.head == "`items`.`element` field ID has changed from 11 to 99") + } + + test("validateCapturedColumns - nested field ID changed in map value struct") { + val originValue = StructType(Seq( + StructField("count", IntegerType).withId("10"), + StructField("label", StringType).withId("11"))) + val originCols = Array(col("props", MapType(StringType, originValue), nullable = true)) + + val currentValue = StructType(Seq( + StructField("count", IntegerType).withId("10"), + StructField("label", StringType).withId("99"))) // label ID changed + val currentCols = Array(col("props", MapType(StringType, currentValue), nullable = true)) + val table = TestTableWithMetadataSupport("test", currentCols) + + val errors = validateCapturedColumns(table, originCols) + assert(errors.size == 1) + assert(errors.head == "`props`.`value` field ID has changed from 11 to 99") + } + + test("validateCapturedColumns - field ID unchanged produces no error") { + val struct = StructType(Seq( + StructField("name", StringType).withId("10"), + StructField("age", IntegerType).withId("11"))) + val cols = Array(col("person", struct, nullable = true)) + val table = TestTableWithMetadataSupport("test", cols) + + val errors = validateCapturedColumns(table, cols) + assert(errors.isEmpty) + } + + test("validateCapturedColumns - field ID check disabled") { + val originStruct = StructType(Seq(StructField("age", IntegerType).withId("11"))) + val originCols = Array(col("person", originStruct, nullable = true)) + + val currentStruct = StructType(Seq(StructField("age", IntegerType).withId("99"))) + val currentCols = Array(col("person", currentStruct, nullable = true)) + val table = TestTableWithMetadataSupport("test", currentCols) + + val errors = V2TableUtil.validateCapturedColumns( + table, originCols.toImmutableArraySeq, checkFieldIds = false) + assert(errors.isEmpty, "Disabled field ID check should not report ID changes") + } + test("validateCapturedColumns - ALLOW_NEW_TOP_LEVEL_FIELDS allows top-level additions") { val originCols = Array( col("id", LongType, nullable = false), @@ -630,23 +760,6 @@ class V2TableUtilSuite extends SparkFunSuite { assert(errors.head.contains("`metadata`.`value`.`timestamp` BIGINT has been added")) } - test("validateColumnIds - multiple errors") { - val originalCols = Seq( - colWithId("salary", IntegerType, nullable = true, id = "id-1"), - colWithId("bonus", IntegerType, nullable = true, id = "id-2")) - val currentCols = Array( - colWithId("salary", IntegerType, nullable = true, id = "id-100"), - colWithId("bonus", IntegerType, nullable = true, id = "id-200")) - val table = TestTableWithMetadataSupport("test", currentCols) - - val errors = V2TableUtil.validateColumnIds( - table = table, - originalCapturedCols = originalCols) - assert(errors == Seq( - "`salary` column ID has changed from id-1 to id-100", - "`bonus` column ID has changed from id-2 to id-200")) - } - // simple table without metadata column support private case class TestTable( override val name: String, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/txns.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/txns.scala index 203aed450a5f7..15d1732d0e696 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/txns.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/txns.scala @@ -116,16 +116,10 @@ class TxnTable( // don't false-positive on the TxnTable wrapper having a different UUID. override val id: String = delegate.id - // The starting version should be the delegate version. - setVersion(delegate.version()) - - // Preserve column IDs from the delegate so that column ID validation can correctly detect - // drop-and-re-add scenarios (different IDs) and pass when columns are unchanged (same IDs). - // Uses assignMissingIds to keep the delegate's IDs for existing columns while assigning - // fresh IDs for any new columns added by schema evolution. - updateColumns(InMemoryBaseTable.assignMissingIds( - oldColumns = delegate.columns(), - newColumns = columns())) + // Column IDs for existing columns are preserved through the StructType round-trip via + // metadata encoding. assignMissingIds assigns fresh IDs to any new columns added by + // schema evolution. + updateColumns(InMemoryBaseTable.assignMissingIds(columns())) alterTableWithData(delegate.data, schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index a472f6cf8e14c..5ab39e28c5e15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -81,15 +81,16 @@ case class CreateTableLikeExec( Seq.empty } - // Derive target columns from source; for V1Table sources apply CharVarcharUtils to preserve - // CHAR/VARCHAR types as declared rather than collapsed to StringType. - private def targetColumns: Array[Column] = - sourceTable match { - case v1: V1Table => - CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema)) - case _ => - sourceTable.columns + // Derive target columns from source without propagating source field IDs to the new table. + // For V1Table sources, apply CharVarcharUtils to preserve CHAR/VARCHAR types as declared + // rather than collapsed to StringType. + private def targetColumns: Array[Column] = { + val rawSchema = sourceTable match { + case v1: V1Table => CharVarcharUtils.getRawSchema(v1.catalogTable.schema) + case _ => CatalogV2Util.v2ColumnsToStructType(sourceTable.columns) } + CatalogV2Util.structTypeToV2Columns(rawSchema, keepFieldIds = false) + } // Source table properties are intentionally excluded; connectors read sourceTable // to clone any additional format-specific or custom state they need. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala index 46359f1fa8a2d..ed5282163c6c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala @@ -97,7 +97,6 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging { } }) validateTableIdentity(currentTable, r) - validateColumnIds(currentTable, r) validateDataColumns(currentTable, r, schemaValidationMode) validateMetadataColumns(currentTable, r, schemaValidationMode) r.copy(table = currentTable) @@ -125,15 +124,6 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging { V2TableUtil.validateTableId(relation.name, relation.table.id, currentTable) } - private def validateColumnIds( - currentTable: Table, - relation: DataSourceV2Relation): Unit = { - val errors = V2TableUtil.validateColumnIds(currentTable, relation) - if (errors.nonEmpty) { - throw QueryCompilationErrors.columnIdMismatchAfterAnalysis(relation.name, errors) - } - } - private def validateDataColumns( currentTable: Table, relation: DataSourceV2Relation, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 33709fbd5f5a7..745b885d31572 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -967,7 +967,7 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { protected def getV2Columns(schema: StructType, forceNullable: Boolean): Array[Column] = { val rawSchema = CharVarcharUtils.getRawSchema(removeInternalMetadata(schema), conf) val tableSchema = if (forceNullable) rawSchema.asNullable else rawSchema - CatalogV2Util.structTypeToV2Columns(tableSchema) + CatalogV2Util.structTypeToV2Columns(tableSchema, keepFieldIds = false) } protected def writeToTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 71632e07c78b7..4340c71863860 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkS import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, GenerationExpression, Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog} +import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, ColumnDefaultValue, DefaultValue, GenerationExpression, Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -77,10 +77,6 @@ class DataSourceV2DataFrameSuite .set("spark.sql.catalog.mixedcolidcat", classOf[MixedColumnIdTableCatalog].getName) .set("spark.sql.catalog.mixedcolidcat.copyOnLoad", "true") - .set("spark.sql.catalog.composedidcat", - classOf[ComposedColumnIdTableCatalog].getName) - .set("spark.sql.catalog.composedidcat.copyOnLoad", "true") - .set(SQLConf.TIME_TYPE_ENABLED.key, "true") after { catalog("cachingcat").asInstanceOf[CachingInMemoryTableCatalog].clearCache() @@ -1156,6 +1152,7 @@ class DataSourceV2DataFrameSuite } } + test("create/replace table with generated columns should have V2 Expression") { val tableName = "testcat.ns1.ns2.tbl" withTable(tableName) { @@ -2045,7 +2042,7 @@ class DataSourceV2DataFrameSuite // // Core behavior: when a DataFrame captures column IDs at analysis time, // and those IDs change before execution, the query is rejected with - // COLUMN_ID_MISMATCH. + // COLUMNS_MISMATCH. test("drop+re-add column with same name and type rejects stale DataFrame") { val t = "testcat.ns1.ns2.tbl" @@ -2060,7 +2057,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2079,7 +2076,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2098,7 +2095,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2119,7 +2116,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*salary.*bonus.*")) @@ -2161,7 +2158,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2196,7 +2193,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2214,14 +2211,14 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } } test("drop+re-add nested struct field rejects stale DataFrame") { - val t = "composedidcat.ns1.ns2.tbl" + val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, person STRUCT) USING foo") sql(s"INSERT INTO $t VALUES (1, named_struct('name', 'Alice', 'age', 30))") @@ -2232,7 +2229,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2271,7 +2268,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2288,7 +2285,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2339,54 +2336,7 @@ class DataSourceV2DataFrameSuite } } - // Column ID tests: Composed nested IDs - // - // ComposedColumnIdTableCatalog encodes nested field IDs into the - // top-level Column.id() string, modeling the recommended adoption - // pattern for connectors with nested IDs. Any nested - // change produces a different encoded string, so validateColumnIds - // detects it even though Spark only compares top-level strings. - - test("composed nested IDs detect drop+re-add of nested field") { - val t = "composedidcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, person STRUCT) USING foo") - sql(s"INSERT INTO $t VALUES (1, named_struct('name', 'Alice', 'age', 30))") - val df = spark.table(t) - - sql(s"ALTER TABLE $t DROP COLUMN person.age") - sql(s"ALTER TABLE $t ADD COLUMN person.age INT") - - // The inner age field got a new nested ID on re-add. The composed - // top-level string changes, so COLUMN_ID_MISMATCH fires. - checkError( - exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", - matchPVals = true, - parameters = Map("tableName" -> ".*", "errors" -> ".*")) - } - } - - test("composed nested IDs tolerate same data inserted into nested column") { - val t = "composedidcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, person STRUCT) USING foo") - sql(s"INSERT INTO $t VALUES (1, named_struct('name', 'Alice', 'age', 30))") - val df = spark.table(t) - - // pure data insert, no schema change: composed IDs stay the same - sql(s"INSERT INTO $t VALUES (2, named_struct('name', 'Bob', 'age', 25))") - - checkAnswer(df, Seq( - Row(1, Row("Alice", 30)), - Row(2, Row("Bob", 25)))) - } - } - // Column ID tests: Additional nested coverage - // - // These tests fill specific nested cells that are not covered by the - // coarse (testcat) or composed (composedidcat) groups above. // Nested type change with preserved top-level ID: the standard catalog // preserves the parent ID, so schema validation catches the incompatible @@ -2410,10 +2360,8 @@ class DataSourceV2DataFrameSuite } } - // Depth >= 3 nesting with composed IDs: drop+re-add at depth 3 produces - // a different composed ID at the top level. - test("depth 3 nesting with composed IDs detects deep field change") { - val t = "composedidcat.ns1.ns2.tbl" + test("depth 3 nesting detects deep nested field drop+re-add") { + val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, a STRUCT>) USING foo") sql(s"INSERT INTO $t VALUES (1, named_struct('b', named_struct('c', 42)))") @@ -2422,11 +2370,9 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t DROP COLUMN a.b.c") sql(s"ALTER TABLE $t ADD COLUMN a.b.c INT") - // The deep nested field c got a new ID on re-add, changing the - // composed top-level ID for column a. checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2461,14 +2407,8 @@ class DataSourceV2DataFrameSuite // nested fields are added or dropped (assignMissingIds matches by name // only). These tests verify that behavior using the catalog API. - // Column ID tests: Composed IDs for container types (arrays, maps) - // - // ComposedColumnIdTableCatalog encodes nested field IDs into the - // top-level string. These tests verify detection of nested drop+re-add - // inside array element structs and map value structs. - - test("composed nested IDs detect drop+re-add in array element struct") { - val t = "composedidcat.ns1.ns2.tbl" + test("drop+re-add in array element struct detected by field ID") { + val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, items ARRAY>) USING foo") sql(s"INSERT INTO $t VALUES (1, array(named_struct('name', 'x', 'price', 10)))") @@ -2477,18 +2417,16 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t DROP COLUMN items.element.price") sql(s"ALTER TABLE $t ADD COLUMN items.element.price INT") - // The nested price field got a new ID on re-add. The composed - // top-level ID for items changes, so COLUMN_ID_MISMATCH fires. checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } } - test("composed nested IDs detect drop+re-add in map value struct") { - val t = "composedidcat.ns1.ns2.tbl" + test("drop+re-add in map value struct detected by field ID") { + val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, props MAP>) USING foo") sql(s"INSERT INTO $t VALUES (1, map('k1', named_struct('x', 10, 'y', 20)))") @@ -2497,28 +2435,6 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t DROP COLUMN props.value.y") sql(s"ALTER TABLE $t ADD COLUMN props.value.y INT") - // The nested y field got a new ID on re-add. The composed - // top-level ID for props changes, so COLUMN_ID_MISMATCH fires. - checkError( - exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", - matchPVals = true, - parameters = Map("tableName" -> ".*", "errors" -> ".*")) - } - } - - test("composed nested IDs detect rename within struct") { - val t = "composedidcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id INT, person STRUCT) USING foo") - sql(s"INSERT INTO $t VALUES (1, named_struct('name', 'Alice', 'age', 30))") - val df = spark.table(t) - - sql(s"ALTER TABLE $t RENAME COLUMN person.name TO first_name") - - // With position-based keys, the renamed field stays at position 0 - // and keeps its nested ID. The composed string is unchanged, so - // schema validation catches the struct type difference instead. checkError( exception = intercept[AnalysisException] { df.collect() }, condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", @@ -2527,13 +2443,13 @@ class DataSourceV2DataFrameSuite } } - test("composed nested IDs: reorder preserves composed column ID") { - val t = "composedidcat.ns1.ns2.tbl" + test("reorder preserves top-level column ID") { + val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { sql(s"CREATE TABLE $t (id INT, person STRUCT) USING foo") - val cat = catalog("composedidcat") + val cat = catalog("testcat") val personBefore = cat.loadTable(ident).columns().find(_.name() == "person").get val idBefore = personBefore.id() val typeBefore = personBefore.dataType() @@ -2552,15 +2468,14 @@ class DataSourceV2DataFrameSuite assert(typeAfter.toString.startsWith("StructType(StructField(age"), s"age should be first field after reorder, got: $typeAfter") - // Position-based keys: each ordinal position keeps its old ID after - // reorder, so the composed string is unchanged despite the schema change. + // The top-level column ID is preserved after reorder. assert(idBefore == idAfter, - s"Composed ID should be unchanged after reorder: $idBefore vs $idAfter") + s"Top-level column ID should be unchanged after reorder: $idBefore vs $idAfter") } } - test("composed nested IDs tolerate nested field reorder end-to-end") { - val t = "composedidcat.ns1.ns2.tbl" + test("nested field reorder does not trigger column ID mismatch") { + val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, person STRUCT) USING foo") sql(s"INSERT INTO $t VALUES (1, named_struct('name', 'Alice', 'age', 30))") @@ -2571,13 +2486,13 @@ class DataSourceV2DataFrameSuite // InMemoryTable does not actually reorder nested struct fields in stored // data, so the read still returns the original field order. This is fine // because the purpose of this test is to verify that the column ID check - // passes (no COLUMN_ID_MISMATCH) after a nested field reorder. + // passes (no COLUMNS_MISMATCH) after a nested field reorder. checkAnswer(df, Seq(Row(1, Row("Alice", 30)))) } } - test("composed nested IDs detect drop+re-add in map key struct") { - val t = "composedidcat.ns1.ns2.tbl" + test("drop+re-add in map key struct detected by field ID") { + val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t " + s"(id INT, coords MAP, STRING>) USING foo") @@ -2588,11 +2503,9 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t DROP COLUMN coords.key.y") sql(s"ALTER TABLE $t ADD COLUMN coords.key.y INT") - // The nested y field in the map key struct got a new ID on re-add. - // The composed top-level ID for coords changes. checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2693,7 +2606,7 @@ class DataSourceV2DataFrameSuite exception = intercept[AnalysisException] { df1.join(df2, df1("id") === df2("id")).collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2718,7 +2631,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.filter("salary > 50").collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2739,7 +2652,7 @@ class DataSourceV2DataFrameSuite exception = intercept[AnalysisException] { df.groupBy("id").agg(sum("salary")).collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2758,7 +2671,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.orderBy("salary").collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2777,7 +2690,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.select("salary").collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2801,7 +2714,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2853,7 +2766,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -2874,7 +2787,7 @@ class DataSourceV2DataFrameSuite // stale DataFrame detects salary ID mismatch checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*salary.*")) @@ -2905,7 +2818,7 @@ class DataSourceV2DataFrameSuite // reset-id catalog assigns a new ID for the widened column checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) } @@ -3008,7 +2921,7 @@ class DataSourceV2DataFrameSuite // [[commandExecuted]] phase, before the refresh phase runs. As a result, // column ID validation does not apply to the source DataFrame in a // [[writeTo]] path. The append succeeds without throwing a - // COLUMN_ID_MISMATCH error. + // COLUMNS_MISMATCH error. test("writeTo().append() does not throw column ID mismatch after drop+re-add column") { val t = "testcat.ns1.ns2.tbl" withTable(t) { @@ -3020,7 +2933,7 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t ADD COLUMN salary INT") // Command is eagerly executed before the refresh phase validates - // column IDs. No COLUMN_ID_MISMATCH exception is thrown. + // column IDs. No COLUMNS_MISMATCH exception is thrown. source.writeTo(t).append() } } @@ -3039,7 +2952,7 @@ class DataSourceV2DataFrameSuite exception = intercept[AnalysisException] { source.write.format(v2Format).insertInto(t) }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*")) } @@ -3067,7 +2980,7 @@ class DataSourceV2DataFrameSuite checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*")) } @@ -3110,7 +3023,7 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t ADD COLUMN bonus INT") // The stale DataFrame has only [id, salary] while the table now has - // [id, salary, bonus]. Since column IDs are null, no COLUMN_ID_MISMATCH + // [id, salary, bonus]. Since column IDs are null, no COLUMNS_MISMATCH // error is thrown; new columns are tolerated for read queries. checkAnswer(df, Seq(Row(1, 100))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2ExtSessionColumnIdSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2ExtSessionColumnIdSuite.scala index ecc1dcccec1d7..759bb5f01e683 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2ExtSessionColumnIdSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2ExtSessionColumnIdSuite.scala @@ -133,7 +133,7 @@ class DataSourceV2ExtSessionColumnIdSuite extends SharedSparkSession { exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map( "tableName" -> ".*", "errors" -> ".*")) @@ -156,7 +156,7 @@ class DataSourceV2ExtSessionColumnIdSuite extends SharedSparkSession { // NullTableIdInMemoryTableCatalog), so column ID check catches it checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*")) } @@ -201,7 +201,7 @@ class DataSourceV2ExtSessionColumnIdSuite extends SharedSparkSession { // both column ID mismatches are detected checkError( exception = intercept[AnalysisException] { df.collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*salary.*bonus.*")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 752c8ed6c22e0..c111bf0dc3199 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION} import org.apache.spark.sql.sources.SimpleScanSource -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String abstract class DataSourceV2SQLSuite @@ -457,6 +457,44 @@ class DataSourceV2SQLSuiteV1Filter } } + test("CreateTableAsSelect: field IDs in query schema are not propagated to table columns") { + val basicCatalog = catalog("testcat").asTableCatalog + val atomicCatalog = catalog("testcat_atomic").asTableCatalog + val basicIdentifier = "testcat.table_name" + val atomicIdentifier = "testcat_atomic.table_name" + + // Use a non-numeric marker so it can never clash with InMemoryTable's sequential numeric IDs. + val sourceId = "source-id" + val nestedType = new StructType(Array(StructField("value", LongType).withId(sourceId))) + val schema = new StructType(Array( + StructField("id", LongType).withId(sourceId), + StructField("nested", nestedType).withId(sourceId))) + val sourceWithIds = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1L, Row(42L)), Row(2L, Row(43L)))), schema) + + withTempView("source_with_ids") { + sourceWithIds.createOrReplaceTempView("source_with_ids") + Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach { + case (cat, identifier) => + withTable(identifier) { + spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT * FROM source_with_ids") + val table = cat.loadTable(Identifier.of(Array(), "table_name")) + table.columns.foreach { col => + assert(col.metadataInJSON == null) + assert(col.id != sourceId) + col.dataType match { + case s: StructType => + s.fields.foreach { f => + assert(f.id.forall(_ != sourceId)) + } + case _ => + } + } + } + } + } + } + test("CreateTableAsSelect: do not double execute on collect(), take() and other queries") { val basicCatalog = catalog("testcat").asTableCatalog val atomicCatalog = catalog("testcat_atomic").asTableCatalog @@ -615,6 +653,44 @@ class DataSourceV2SQLSuiteV1Filter } } + test("ReplaceTableAsSelect: field IDs in query schema are not propagated to table columns") { + val basicCatalog = catalog("testcat").asTableCatalog + val atomicCatalog = catalog("testcat_atomic").asTableCatalog + val basicIdentifier = "testcat.table_name" + val atomicIdentifier = "testcat_atomic.table_name" + + val sourceId = "source-id" + val nestedType = new StructType(Array(StructField("value", LongType).withId(sourceId))) + val schema = new StructType(Array( + StructField("id", LongType).withId(sourceId), + StructField("nested", nestedType).withId(sourceId))) + val sourceWithIds = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1L, Row(42L)), Row(2L, Row(43L)))), schema) + + withTempView("source_with_ids") { + sourceWithIds.createOrReplaceTempView("source_with_ids") + Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach { + case (cat, identifier) => + withTable(identifier) { + spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT * FROM source_with_ids") + spark.sql(s"REPLACE TABLE $identifier USING foo AS SELECT * FROM source_with_ids") + val table = cat.loadTable(Identifier.of(Array(), "table_name")) + table.columns.foreach { col => + assert(col.metadataInJSON == null) + assert(col.id != sourceId) + col.dataType match { + case s: StructType => + s.fields.foreach { f => + assert(f.id.forall(_ != sourceId)) + } + case _ => + } + } + } + } + } + } + Seq("REPLACE", "CREATE OR REPLACE").foreach { cmd => test(s"ReplaceTableAsSelect: do not double execute $cmd on collect()") { val basicCatalog = catalog("testcat").asTableCatalog diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala index cb59ce80328d8..7e17a71da9cd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala @@ -180,7 +180,7 @@ class MergeIntoDataFrameSuite extends RowLevelOperationSuiteBase { .update(Map("salary" -> targetTableCol("salary").plus(1))) .merge() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> ".*")) From d303eb9e986c3c5020277b859fcd663f8efc737c Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 23 Jun 2026 22:59:11 -0700 Subject: [PATCH 2/3] Address feedback --- .../catalyst/util/FieldMetadataUtils.scala | 2 +- .../spark/sql/connector/catalog/Column.java | 10 +- .../spark/sql/catalyst/util/package.scala | 2 + .../apache/spark/sql/util/SchemaUtils.scala | 8 +- .../ComposedColumnIdTableCatalog.scala | 290 ------------------ .../connector/catalog/InMemoryBaseTable.scala | 2 +- ...dAndNullColumnIdInMemoryTableCatalog.scala | 2 +- .../connector/catalog/V2TableUtilSuite.scala | 16 +- ...v2IncrementallyConstructedQueryTests.scala | 22 +- .../DataSourceV2DataFrameSuite.scala | 16 + .../command/v2/ShowCreateTableSuite.scala | 14 + 11 files changed, 62 insertions(+), 322 deletions(-) delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala index c3ac1db9dc587..c27aef033d5b1 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/FieldMetadataUtils.scala @@ -19,5 +19,5 @@ package org.apache.spark.sql.catalyst.util object FieldMetadataUtils { // Metadata key for the field ID used to track column identity across schema evolution - val FIELD_ID_METADATA_KEY = "FIELD_ID" + val FIELD_ID_METADATA_KEY = "__FIELD_ID" } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java index 3557779c06b08..04a81368b795f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java @@ -60,7 +60,7 @@ static Column create( return newBuilder(name, dataType) .nullable(nullable) .comment(comment) - .metadataInJSON(metadataInJSON) + .metadata(metadataInJSON) .build(); } @@ -75,7 +75,7 @@ static Column create( .nullable(nullable) .comment(comment) .defaultValue(defaultValue) - .metadataInJSON(metadataInJSON) + .metadata(metadataInJSON) .build(); } @@ -90,7 +90,7 @@ static Column create( .nullable(nullable) .comment(comment) .generationExpression(generationExpression) - .metadataInJSON(metadataInJSON) + .metadata(metadataInJSON) .build(); } @@ -105,7 +105,7 @@ static Column create( .nullable(nullable) .comment(comment) .identityColumnSpec(identityColumnSpec) - .metadataInJSON(metadataInJSON) + .metadata(metadataInJSON) .build(); } @@ -267,7 +267,7 @@ public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) { return this; } - public Builder metadataInJSON(String metadataInJSON) { + public Builder metadata(String metadataInJSON) { this.metadataInJSON = metadataInJSON; return this; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 2e7e88633bfa0..5dc3962821a03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.TempResolvedColumn import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY import org.apache.spark.sql.connector.catalog.MetadataColumn import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -241,6 +242,7 @@ package object util extends Logging { AUTO_GENERATED_ALIAS, METADATA_COL_ATTR_KEY, QUALIFIED_ACCESS_ONLY, + FIELD_ID_METADATA_KEY, FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY, FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY, FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 8ae2d2d3043ff..2b966df0da354 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -448,9 +448,10 @@ private[spark] object SchemaUtils { fieldsByName.foreach { case (normalizedName, field) => otherFieldsByName.get(normalizedName) match { case Some(otherField) => + val nameParts = colPath :+ field.name if (checkFieldIds) { for (id <- field.id; otherId <- otherField.id if id != otherId) { - errors += s"${colPath.fullyQuoted} field ID has changed from $id to $otherId" + errors += s"${nameParts.fullyQuoted} field ID has changed from $id to $otherId" } } validateTypeCompatibility( @@ -458,7 +459,7 @@ private[spark] object SchemaUtils { otherField.dataType, field.nullable, otherField.nullable, - colPath :+ field.name, + nameParts, resolver, mode, checkFieldIds, @@ -536,8 +537,7 @@ private[spark] object SchemaUtils { } /** - * Recursively removes field IDs from a data type. Only allocates new objects when a field ID - * is actually present, returning the original instance unchanged when there is nothing to strip. + * Recursively removes field IDs from a data type. */ def removeFieldIds(dataType: DataType): DataType = dataType match { case s: StructType => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala deleted file mode 100644 index 64488a76db7f3..0000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ComposedColumnIdTableCatalog.scala +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog - -import java.util.Locale -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.mutable - -import org.apache.spark.sql.internal.connector.ColumnImpl -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} - -/** - * An [[InMemoryTableCatalog]] that tracks IDs at every nesting level - * (struct fields, array elements, map keys/values) and encodes the full - * subtree into each top-level [[Column.id]] string. - * - * This demonstrates how a connector that wants to detect nested changes - * can encode nested IDs into the top-level [[Column.id]] string. - * Any nested change (drop+re-add a struct field, etc.) produces a - * different encoded top-level string, so [[V2TableUtil.validateColumnIds]] - * detects it without Spark needing to traverse below the top level. - * - * Nested positions are keyed by ordinal path (`Seq[Int]`), not by field - * name. This matches Delta/Iceberg semantics where rename preserves the - * column ID: a renamed field stays at the same ordinal position, so the - * composed string is unchanged and schema validation catches the rename - * via the differing [[StructType]]. - * - * Example: for a column `person STRUCT` with - * root ID 5 and nested field IDs position 0 (name) = 10, - * position 1 (age) = 11, the composed [[Column.id]] string is - * `"5[0:10,1:11]"`. If `age` is dropped and re-added, the new age gets - * ID 12, producing `"5[0:10,1:12]"`. Spark sees different strings and - * fires `COLUMN_ID_MISMATCH`. - */ -class ComposedColumnIdTableCatalog extends InMemoryTableCatalog { - - // Per-table nested ID maps. - // Structure: tableIdentifier -> (columnName -> nestedFieldIdMap) - // where nestedFieldIdMap maps an ordinal path to its assigned ID. - // - // For column `person STRUCT>`: - // "person" -> { - // Seq(0) -> 10, // name - // Seq(1) -> 11, // addr - // Seq(1, 0) -> 12 // addr.city - // } - private val nestedIdMaps = - new ConcurrentHashMap[Identifier, mutable.Map[String, mutable.Map[Seq[Int], Long]]]() - - // Bare (uncomposed) root IDs, tracked separately to avoid double-encoding. - // Structure: tableIdentifier -> (columnName -> bareRootIdString) - private val rootIds = - new ConcurrentHashMap[Identifier, mutable.Map[String, String]]() - - override def createTable( - ident: Identifier, info: TableInfo): Table = { - val table = super.createTable(ident, info).asInstanceOf[InMemoryTable] - val allColumnNestedIds = mutable.Map[String, mutable.Map[Seq[Int], Long]]() - val allRootIds = mutable.Map[String, String]() - - val composedColumns: Array[Column] = table.columns().map { column => - val nestedFieldIds = mutable.Map[Seq[Int], Long]() - assignNestedIds(column.dataType(), parentPath = Seq.empty, nestedFieldIds) - val columnName = column.name().toLowerCase(Locale.ROOT) - allColumnNestedIds(columnName) = nestedFieldIds - allRootIds(columnName) = column.id() - val composedId = encodeComposedId(column.id(), nestedFieldIds) - column.asInstanceOf[ColumnImpl].copy(id = composedId): Column - } - - nestedIdMaps.put(ident, allColumnNestedIds) - rootIds.put(ident, allRootIds) - - val composedTable = new InMemoryTable( - table.name, - composedColumns, - table.partitioning, - table.properties, - table.constraints, - id = table.id) - composedTable.alterTableWithData(table.data, table.schema) - composedTable.setVersionAndValidatedVersionFrom(table) - tables.put(ident, composedTable) - composedTable - } - - override def alterTable(ident: Identifier, changes: TableChange*): Table = { - val oldTable = loadTable(ident).asInstanceOf[InMemoryTable] - val oldColumnNestedIds = Option(nestedIdMaps.get(ident)) - .getOrElse(mutable.Map[String, mutable.Map[Seq[Int], Long]]()) - val oldRootIds = Option(rootIds.get(ident)) - .getOrElse(mutable.Map[String, String]()) - - val alteredTable = super.alterTable(ident, changes: _*).asInstanceOf[InMemoryTable] - - val allColumnNestedIds = mutable.Map[String, mutable.Map[Seq[Int], Long]]() - val allRootIds = mutable.Map[String, String]() - val composedColumns: Array[Column] = alteredTable.columns().map { newColumn => - val columnName = newColumn.name().toLowerCase(Locale.ROOT) - val oldNestedFieldIds = - oldColumnNestedIds.getOrElse(columnName, mutable.Map[Seq[Int], Long]()) - - // Find the old column to compare data types for merging nested IDs - val oldColumnOpt = oldTable.columns() - .find(oldCol => oldCol.name().toLowerCase(Locale.ROOT) == columnName) - - val newNestedFieldIds = oldColumnOpt match { - case Some(oldColumn) => - // Column existed before: preserve IDs for positions that still exist, - // assign fresh IDs for new positions (e.g. a re-added nested field) - mergeNestedIds(oldNestedFieldIds, oldColumn.dataType(), newColumn.dataType()) - case None => - // Brand new column: assign fresh IDs to all nested positions - val freshIds = mutable.Map[Seq[Int], Long]() - assignNestedIds(newColumn.dataType(), parentPath = Seq.empty, freshIds) - freshIds - } - - allColumnNestedIds(columnName) = newNestedFieldIds - - // super.alterTable preserves IDs by name, so newColumn.id() is - // the previously composed string (e.g. "5[0:10,1:11]"). Passing - // that to encodeComposedId would produce "5[0:10,1:11][0:10,1:12]" - // instead of "5[0:10,1:12]". Use the original root ID (e.g. "5") - // from rootIds instead; fall back to newColumn.id() only for - // genuinely new columns whose ID is a fresh numeric string. - val rootId = oldRootIds.getOrElse(columnName, newColumn.id()) - allRootIds(columnName) = rootId - val composedId = encodeComposedId(rootId, newNestedFieldIds) - newColumn.asInstanceOf[ColumnImpl].copy(id = composedId): Column - } - - nestedIdMaps.put(ident, allColumnNestedIds) - rootIds.put(ident, allRootIds) - - val composedTable = new InMemoryTable( - alteredTable.name, - composedColumns, - alteredTable.partitioning, - alteredTable.properties, - alteredTable.constraints, - id = alteredTable.id) - composedTable.alterTableWithData(alteredTable.data, alteredTable.schema) - composedTable.setVersionAndValidatedVersionFrom(alteredTable) - tables.put(ident, composedTable) - composedTable - } - - /** - * Recursively assigns fresh IDs to every nested position in a data type: - * struct fields, array elements, map keys, and map values. - * - * Each position is identified by an ordinal path from the column root: - * - * `STRUCT>` produces: - * - Seq(0) -> id1 (name, position 0) - * - Seq(1) -> id2 (addr, position 1) - * - Seq(1, 0) -> id3 (addr.city, position 0 within addr) - * - * `ARRAY>` produces: - * - Seq(0) -> id1 (element, position 0) - * - Seq(0, 0) -> id2 (element.x, position 0 within element) - * - * `MAP>` produces: - * - Seq(0) -> id1 (key, position 0) - * - Seq(1) -> id2 (value, position 1) - * - Seq(1, 0) -> id3 (value.v, position 0 within value) - */ - private def assignNestedIds( - dataType: DataType, - parentPath: Seq[Int], - nestedFieldIds: mutable.Map[Seq[Int], Long]): Unit = { - dataType match { - case structType: StructType => - structType.fields.zipWithIndex.foreach { case (field, idx) => - val fieldPath = parentPath :+ idx - nestedFieldIds(fieldPath) = InMemoryBaseTable.nextColumnId() - assignNestedIds(field.dataType, fieldPath, nestedFieldIds) - } - case ArrayType(elementType, _) => - val elementPath = parentPath :+ 0 - nestedFieldIds(elementPath) = InMemoryBaseTable.nextColumnId() - assignNestedIds(elementType, elementPath, nestedFieldIds) - case MapType(keyType, valueType, _) => - val keyPath = parentPath :+ 0 - nestedFieldIds(keyPath) = InMemoryBaseTable.nextColumnId() - assignNestedIds(keyType, keyPath, nestedFieldIds) - val valuePath = parentPath :+ 1 - nestedFieldIds(valuePath) = InMemoryBaseTable.nextColumnId() - assignNestedIds(valueType, valuePath, nestedFieldIds) - case _ => // primitive types have no nested structure - } - } - - /** - * Merges nested IDs from old to new: preserves IDs for ordinal positions - * that exist in both old and new types, assigns fresh IDs for new positions. - * - * For example, if the old type is `STRUCT` with - * IDs {Seq(0)->10, Seq(1)->11}, and the new type is - * `STRUCT` after drop+re-add of `age`, then `age` - * gets a fresh ID 12 because its position was removed and re-added, while - * `name` keeps ID 10. - */ - private def mergeNestedIds( - oldFieldIds: mutable.Map[Seq[Int], Long], - oldType: DataType, - newType: DataType): mutable.Map[Seq[Int], Long] = { - val mergedFieldIds = mutable.Map[Seq[Int], Long]() - walkAndMerge(newType, parentPath = Seq.empty, mergedFieldIds, oldFieldIds) - mergedFieldIds - } - - /** - * Walks the new data type and for each nested position, either preserves - * the old ID (if the ordinal path existed before) or assigns a fresh one. - */ - private def walkAndMerge( - dataType: DataType, - parentPath: Seq[Int], - mergedFieldIds: mutable.Map[Seq[Int], Long], - oldFieldIds: mutable.Map[Seq[Int], Long]): Unit = { - dataType match { - case structType: StructType => - structType.fields.zipWithIndex.foreach { case (field, idx) => - val fieldPath = parentPath :+ idx - mergedFieldIds(fieldPath) = - oldFieldIds.getOrElse(fieldPath, InMemoryBaseTable.nextColumnId()) - walkAndMerge(field.dataType, fieldPath, mergedFieldIds, oldFieldIds) - } - case ArrayType(elementType, _) => - val elementPath = parentPath :+ 0 - mergedFieldIds(elementPath) = - oldFieldIds.getOrElse(elementPath, InMemoryBaseTable.nextColumnId()) - walkAndMerge(elementType, elementPath, mergedFieldIds, oldFieldIds) - case MapType(keyType, valueType, _) => - val keyPath = parentPath :+ 0 - mergedFieldIds(keyPath) = - oldFieldIds.getOrElse(keyPath, InMemoryBaseTable.nextColumnId()) - walkAndMerge(keyType, keyPath, mergedFieldIds, oldFieldIds) - val valuePath = parentPath :+ 1 - mergedFieldIds(valuePath) = - oldFieldIds.getOrElse(valuePath, InMemoryBaseTable.nextColumnId()) - walkAndMerge(valueType, valuePath, mergedFieldIds, oldFieldIds) - case _ => - } - } - - /** - * Encodes a root ID and its nested field IDs into a single deterministic string. - * Format: `rootId[path1:id1,path2:id2,...]` with paths sorted - * lexicographically by their dot-joined ordinal representation. - * - * Example: column `person STRUCT` with root ID "5" - * and nested field IDs {Seq(0)->10, Seq(1)->11} encodes as: - * `"5[0:10,1:11]"` - * - * If the column has no nested fields (e.g. `INT`), returns just the root ID. - */ - private def encodeComposedId( - rootId: String, - nestedFieldIds: mutable.Map[Seq[Int], Long]): String = { - if (nestedFieldIds.isEmpty) { - rootId - } else { - val sortedEntries = nestedFieldIds.toSeq.sortBy(_._1.mkString(".")) - val encoded = sortedEntries.map { case (fieldPath, fieldId) => - s"${fieldPath.mkString(".")}:$fieldId" - }.mkString(",") - s"$rootId[$encoded]" - } - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 67797c83715bc..dfb8b39d25a16 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -914,7 +914,7 @@ object InMemoryBaseTable { * have one. Recurses into struct fields within ArrayType and MapType so that every field * at every depth gets an ID. * - * Existing IDs are preserved: Column → StructType → Column round-trip encodes them in + * Existing IDs are preserved: Column -> StructType -> Column round-trip encodes them in * StructField metadata (see StructField.FIELD_ID_METADATA_KEY), so only genuinely new fields * arrive here without an ID. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala index df7964f63b855..c1d45e42fc467 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdAndNullColumnIdInMemoryTableCatalog.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.connector.ColumnImpl * connectors that support neither table nor column identity tracking. * * When both IDs are null, neither the table identity check in [[V2TableRefreshUtil]] - * nor [[V2TableUtil.validateColumnIds]] fires, so drop/recreate of a table or + * nor the column schema check fires, so drop/recreate of a table or * drop/re-add of a column goes undetected. */ class NullTableIdAndNullColumnIdInMemoryTableCatalog extends InMemoryTableCatalog { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala index 11539657fbe36..cb9dad8263b68 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala @@ -565,8 +565,7 @@ class V2TableUtilSuite extends SparkFunSuite { val errors = validateCapturedColumns(table, originCols) assert(errors.size == 1) - // colPath is Seq.empty at top level, so the column name does not appear in the message - assert(errors.head == " field ID has changed from 1 to 99") + assert(errors.head == "`id` field ID has changed from 1 to 99") } test("validateCapturedColumns - nested struct field ID changed") { @@ -583,8 +582,7 @@ class V2TableUtilSuite extends SparkFunSuite { val errors = validateCapturedColumns(table, originCols) assert(errors.size == 1) - // colPath at the check site is Seq("person") — the parent struct path - assert(errors.head == "`person` field ID has changed from 11 to 99") + assert(errors.head == "`person`.`age` field ID has changed from 11 to 99") } test("validateCapturedColumns - doubly-nested struct field ID changed") { @@ -599,7 +597,7 @@ class V2TableUtilSuite extends SparkFunSuite { val errors = validateCapturedColumns(table, originCols) assert(errors.size == 1) - assert(errors.head == "`person`.`info` field ID has changed from 11 to 99") + assert(errors.head == "`person`.`info`.`age` field ID has changed from 11 to 99") } test("validateCapturedColumns - multiple nested struct field IDs changed") { @@ -618,8 +616,8 @@ class V2TableUtilSuite extends SparkFunSuite { val errors = validateCapturedColumns(table, originCols) assert(errors.size == 2) - assert(errors.exists(_ == "`person` field ID has changed from 11 to 98")) - assert(errors.exists(_ == "`person` field ID has changed from 12 to 99")) + assert(errors.exists(_ == "`person`.`age` field ID has changed from 11 to 98")) + assert(errors.exists(_ == "`person`.`score` field ID has changed from 12 to 99")) } test("validateCapturedColumns - nested field ID changed in array element struct") { @@ -636,7 +634,7 @@ class V2TableUtilSuite extends SparkFunSuite { val errors = validateCapturedColumns(table, originCols) assert(errors.size == 1) - assert(errors.head == "`items`.`element` field ID has changed from 11 to 99") + assert(errors.head == "`items`.`element`.`price` field ID has changed from 11 to 99") } test("validateCapturedColumns - nested field ID changed in map value struct") { @@ -653,7 +651,7 @@ class V2TableUtilSuite extends SparkFunSuite { val errors = validateCapturedColumns(table, originCols) assert(errors.size == 1) - assert(errors.head == "`props`.`value` field ID has changed from 11 to 99") + assert(errors.head == "`props`.`value`.`label` field ID has changed from 11 to 99") } test("validateCapturedColumns - field ID unchanged produces no error") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala index 1dbaad18e3e71..5ad7fdd51c370 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala @@ -234,7 +234,7 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas // --------------------------------------------------------------------------- // Scenario 4: external drop and recreate table. // 4a: table ID detects it, TABLE_ID_MISMATCH in classic, succeeds in Connect - // 4b: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in Connect + // 4b: column IDs detect it, COLUMNS_MISMATCH in classic, succeeds in Connect // 4c: no IDs, goes undetected, join succeeds (both modes) // --------------------------------------------------------------------------- @@ -321,7 +321,7 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas exception = intercept[AnalysisException] { df1.join(df2, df1("id") === df2("id")).collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*")) } @@ -364,7 +364,7 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas df1.join(df2, df1("id") === df2("id")), Seq(Row(2, 200, 2, 200))) } else { - // Classic: neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires, so the + // Classic: neither TABLE_ID_MISMATCH nor COLUMNS_MISMATCH fires, so the // drop and recreate goes undetected. df1 keeps its pre-drop snapshot // (1, 100) while df2 reads the recreated table (2, 200), so the join finds // no matching ids and returns no rows. @@ -378,7 +378,7 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas // --------------------------------------------------------------------------- // Scenario 5: external drop+re-add column. - // 5a: column IDs detect it, COLUMN_ID_MISMATCH in classic, succeeds in Connect + // 5a: column IDs detect it, COLUMNS_MISMATCH in classic, succeeds in Connect // 5b: no IDs, goes undetected, join succeeds (both modes) // --------------------------------------------------------------------------- @@ -411,7 +411,7 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas exception = intercept[AnalysisException] { df1.join(df2, df1("id") === df2("id")).collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*")) } @@ -438,7 +438,7 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas val df2 = session.table(nullBothT) - // Neither TABLE_ID_MISMATCH nor COLUMN_ID_MISMATCH fires. + // Neither TABLE_ID_MISMATCH nor COLUMNS_MISMATCH fires. // The change goes undetected and the join succeeds. checkRows( df1.join(df2, df1("id") === df2("id")), @@ -449,10 +449,10 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas // --------------------------------------------------------------------------- // Scenario 6: external type change (drop INT column, add STRING column). - // The delete removes the old column ID and the add assigns a fresh one, - // so the column ID check fires (COLUMN_ID_MISMATCH) in classic before schema - // validation gets a chance to compare data types. - // Connect re-resolves both sides with the new column ID. + // The delete removes the old column and the add assigns a fresh one, + // so COLUMNS_MISMATCH fires in classic before schema validation gets a + // chance to compare data types. + // Connect re-resolves both sides with the new column. // --------------------------------------------------------------------------- test(s"${testPrefix}SPARK-54157: join after external drop+re-add different-type column" + @@ -485,7 +485,7 @@ trait DSv2IncrementallyConstructedQueryTests extends DSv2ExternalMutationTestBas exception = intercept[AnalysisException] { df1.join(df2, df1("id") === df2("id")).collect() }, - condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH", + condition = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH", matchPVals = true, parameters = Map("tableName" -> ".*", "errors" -> "(?s).*")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 4340c71863860..f755f269fca90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -3092,6 +3092,22 @@ class DataSourceV2DataFrameSuite } } + test("SPARK-57544: V1 CTAS from DSv2 scan does not persist column IDs in catalog schema") { + val v2Src = "testcat.ns1.ns2.v2_src" + val v1Dst = "v1_dst" + withTable(v2Src, v1Dst) { + sql(s"CREATE TABLE $v2Src (id INT, salary INT) USING foo") + sql(s"INSERT INTO $v2Src VALUES (1, 100)") + + // DSv2 source carries column IDs on its output attributes. + assert(spark.table(v2Src).schema.fields.forall(_.id.isDefined)) + + // V1 CTAS from a DSv2 scan: column IDs must not be stored in the V1 catalog schema. + sql(s"CREATE TABLE $v1Dst USING parquet AS SELECT * FROM $v2Src") + assert(spark.table(v1Dst).schema.fields.forall(_.id.isEmpty)) + } + } + test("SPARK-53924: temp view on DSv2 table allows top-level column additions") { val t = "testcat.ns1.ns2.tbl" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala index e2ce378f21cc5..94ec4920a3943 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala @@ -200,6 +200,20 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command } } + test("SPARK-57544: show create table does not expose column IDs; schema does") { + withNamespaceAndTable(ns, table) { t => + sql(s"CREATE TABLE $t (id INT, salary INT) $defaultUsing") + + // Column IDs assigned by the catalog must NOT appear in SHOW CREATE TABLE output. + val showDDL = getShowCreateDDL(t) + assert(!showDDL.exists(_.contains("__FIELD_ID")), s"command must not expose column IDs") + + // Column IDs must be accessible via df.schema. + val fields = spark.table(t).schema.fields + assert(fields.forall(_.id.isDefined)) + } + } + test("show table constraints") { withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => withNamespaceAndTable("ns", "other_table", nonPartitionCatalog) { otherTable => From 6017c460fbdab9e92f4d4ce17f9d47941a46c7c2 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 24 Jun 2026 08:58:29 -0700 Subject: [PATCH 3/3] Fix compilation --- .../spark/sql/connector/catalog/CatalogV2Util.scala | 8 ++++---- .../apache/spark/sql/connector/catalog/ColumnSuite.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 2533dab1796d5..b83aff7e920ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -773,7 +773,7 @@ private[sql] object CatalogV2Util { .nullable(f.nullable) .comment(comment) .defaultValue(defaultValue) - .metadataInJSON(metadataAsJson(cleanedMetadata)) + .metadata(metadataAsJson(cleanedMetadata)) .id(id) .build() } else if (isGeneratedColumn) { @@ -782,7 +782,7 @@ private[sql] object CatalogV2Util { .nullable(f.nullable) .comment(comment) .generationExpression(GeneratedColumn.getGenerationExpression(f).get) - .metadataInJSON(metadataAsJson(cleanedMetadata)) + .metadata(metadataAsJson(cleanedMetadata)) .id(id) .build() } else if (isIdentityColumn) { @@ -791,7 +791,7 @@ private[sql] object CatalogV2Util { .nullable(f.nullable) .comment(comment) .identityColumnSpec(IdentityColumn.getIdentityInfo(f).get) - .metadataInJSON(metadataAsJson(cleanedMetadata)) + .metadata(metadataAsJson(cleanedMetadata)) .id(id) .build() } else { @@ -799,7 +799,7 @@ private[sql] object CatalogV2Util { Column.newBuilder(f.name, dataType) .nullable(f.nullable) .comment(comment) - .metadataInJSON(metadataAsJson(cleanedMetadata)) + .metadata(metadataAsJson(cleanedMetadata)) .id(id) .build() } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala index 62083c4205fbf..4d2c986ebc042 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/ColumnSuite.scala @@ -108,7 +108,7 @@ class ColumnSuite extends SparkFunSuite { .nullable(false) .comment("doc") .defaultValue(defaultValue) - .metadataInJSON("""{"k":"v"}""") + .metadata("""{"k":"v"}""") .id("abc-123") .build() assert(col.name() == "c")