-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57544][SQL] Rework column ID validation for nested fields in DSv2 #56619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.util | ||
|
|
||
| object FieldMetadataUtils { | ||
| // Metadata key for the field ID used to track column identity across schema evolution | ||
| val FIELD_ID_METADATA_KEY = "__FIELD_ID" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Went for |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.json4s.JsonDSL._ | |
| import org.apache.spark.SparkException | ||
| import org.apache.spark.annotation.Stable | ||
| import org.apache.spark.sql.catalyst.util.{CollationFactory, QuotingUtils, StringConcat} | ||
| import org.apache.spark.sql.catalyst.util.FieldMetadataUtils.FIELD_ID_METADATA_KEY | ||
| import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY} | ||
| import org.apache.spark.util.SparkSchemaUtils | ||
|
|
||
|
|
@@ -243,6 +244,43 @@ case class StructField( | |
| metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY) | ||
| } | ||
|
|
||
| /** | ||
| * Updates the field with an ID for column identity tracking. | ||
| */ | ||
| def withId(id: String): StructField = { | ||
| val newMetadata = new MetadataBuilder() | ||
| .withMetadata(metadata) | ||
| .putString(FIELD_ID_METADATA_KEY, id) | ||
| .build() | ||
| copy(metadata = newMetadata) | ||
| } | ||
|
|
||
| /** | ||
| * Returns the ID of this field, if set. | ||
| */ | ||
| def id: Option[String] = { | ||
| if (metadata.contains(FIELD_ID_METADATA_KEY)) { | ||
| Some(metadata.getString(FIELD_ID_METADATA_KEY)) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns a copy of this field with the field ID removed, or this field if no ID is set. | ||
| */ | ||
| def clearId(): StructField = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since currently, we return the field itself, would
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StructField is immutable by design and can't be modified. |
||
| if (metadata.contains(FIELD_ID_METADATA_KEY)) { | ||
| val newMetadata = new MetadataBuilder() | ||
| .withMetadata(metadata) | ||
| .remove(FIELD_ID_METADATA_KEY) | ||
| .build() | ||
| copy(metadata = newMetadata) | ||
| } else { | ||
| this | ||
| } | ||
| } | ||
|
|
||
| private def getDDLDefault = getDefault() | ||
| .orElse(getCurrentDefaultValue()) | ||
| .map(" DEFAULT " + _) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,12 +18,17 @@ | |
| package org.apache.spark.sql.connector.catalog; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.stream.Stream; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| import org.apache.spark.SparkIllegalArgumentException; | ||
| import org.apache.spark.annotation.Evolving; | ||
| import org.apache.spark.sql.connector.expressions.Transform; | ||
| import org.apache.spark.sql.internal.connector.ColumnImpl; | ||
| import org.apache.spark.sql.types.DataType; | ||
| import org.apache.spark.sql.types.StructField; | ||
| import org.apache.spark.sql.util.SchemaUtils; | ||
|
|
||
| /** | ||
| * An interface representing a column of a {@link Table}. It defines basic properties of a column, | ||
|
|
@@ -40,11 +45,11 @@ | |
| public interface Column { | ||
|
|
||
| static Column create(String name, DataType dataType) { | ||
| return create(name, dataType, true); | ||
| return builderFor(name, dataType).build(); | ||
| } | ||
|
|
||
| static Column create(String name, DataType dataType, boolean nullable) { | ||
| return create(name, dataType, nullable, null, null); | ||
| return builderFor(name, dataType).nullable(nullable).build(); | ||
| } | ||
|
|
||
| static Column create( | ||
|
|
@@ -53,16 +58,11 @@ static Column create( | |
| boolean nullable, | ||
| String comment, | ||
| String metadataInJSON) { | ||
| return new ColumnImpl( | ||
| name, | ||
| dataType, | ||
| nullable, | ||
| comment, | ||
| /* defaultValue = */ null, | ||
| /* generationExpression = */ null, | ||
| /* identityColumnSpec = */ null, | ||
| metadataInJSON, | ||
| /* id = */ null); | ||
| return builderFor(name, dataType) | ||
| .nullable(nullable) | ||
| .comment(comment) | ||
| .metadata(metadataInJSON) | ||
| .build(); | ||
| } | ||
|
|
||
| static Column create( | ||
|
|
@@ -72,88 +72,72 @@ static Column create( | |
| String comment, | ||
| ColumnDefaultValue defaultValue, | ||
| String metadataInJSON) { | ||
| return new ColumnImpl( | ||
| name, | ||
| dataType, | ||
| nullable, | ||
| comment, | ||
| defaultValue, | ||
| /* generationExpression = */ null, | ||
| /* identityColumnSpec = */ null, | ||
| metadataInJSON, | ||
| /* id = */ null); | ||
| return builderFor(name, dataType) | ||
| .nullable(nullable) | ||
| .comment(comment) | ||
| .defaultValue(defaultValue) | ||
| .metadata(metadataInJSON) | ||
| .build(); | ||
| } | ||
|
|
||
| /** | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switching newly added create methods to a builder. Hasn't been released yet. |
||
| * Creates a column with a generation expression in SQL string form. | ||
| * | ||
| * @since 4.3.0 | ||
| * @deprecated Use | ||
| * {@link #create(String, DataType, boolean, String, GenerationExpression, String)} instead. | ||
| */ | ||
| @Deprecated | ||
| static Column create( | ||
| String name, | ||
| DataType dataType, | ||
| boolean nullable, | ||
| String comment, | ||
| String generationExpression, | ||
| String metadataInJSON) { | ||
| GenerationExpression genExpr = generationExpression != null | ||
| ? new GenerationExpression(generationExpression) : null; | ||
| return new ColumnImpl( | ||
| name, | ||
| dataType, | ||
| nullable, | ||
| comment, | ||
| /* defaultValue = */ null, | ||
| genExpr, | ||
| /* identityColumnSpec = */ null, | ||
| metadataInJSON, | ||
| /* id = */ null); | ||
| return builderFor(name, dataType) | ||
| .nullable(nullable) | ||
| .comment(comment) | ||
| .generationExpression(generationExpression) | ||
| .metadata(metadataInJSON) | ||
| .build(); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a column with a generation expression object. | ||
| * | ||
| * @since 4.3.0 | ||
| */ | ||
| static Column create( | ||
| String name, | ||
| DataType dataType, | ||
| boolean nullable, | ||
| String comment, | ||
| GenerationExpression generationExpression, | ||
| IdentityColumnSpec identityColumnSpec, | ||
| String metadataInJSON) { | ||
| return new ColumnImpl( | ||
| name, | ||
| dataType, | ||
| nullable, | ||
| comment, | ||
| /* defaultValue = */ null, | ||
| generationExpression, | ||
| /* identityColumnSpec = */ null, | ||
| metadataInJSON, | ||
| /* id = */ null); | ||
| return builderFor(name, dataType) | ||
| .nullable(nullable) | ||
| .comment(comment) | ||
| .identityColumnSpec(identityColumnSpec) | ||
| .metadata(metadataInJSON) | ||
| .build(); | ||
| } | ||
|
|
||
| static Column create( | ||
| String name, | ||
| DataType dataType, | ||
| boolean nullable, | ||
| String comment, | ||
| IdentityColumnSpec identityColumnSpec, | ||
| String metadataInJSON) { | ||
| return new ColumnImpl( | ||
| name, | ||
| dataType, | ||
| nullable, | ||
| comment, | ||
| /* defaultValue = */ null, | ||
| /* generationExpression = */ null, | ||
| identityColumnSpec, | ||
| metadataInJSON, | ||
| /* id = */ null); | ||
| /** | ||
| * Creates a builder for a new column with the given name and data type. | ||
| * | ||
| * @param name the name of the column | ||
| * @param dataType the data type of the column | ||
| * @return a new builder | ||
| * @since 4.2.0 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intentional to be cherry-picked into 4.2. |
||
| */ | ||
| static Builder builderFor(String name, DataType dataType) { | ||
| return new Builder(name, dataType); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a builder with pre-populated info from an existing column. | ||
| * | ||
| * @param column the source column | ||
| * @return a new builder seeded with the column's current state | ||
| * @since 4.2.0 | ||
| */ | ||
| static Builder builderFrom(Column column) { | ||
| return new Builder(column.name(), column.dataType()) | ||
| .nullable(column.nullable()) | ||
| .comment(column.comment()) | ||
| .defaultValue(column.defaultValue()) | ||
| .generationExpression(column.columnGenerationExpression()) | ||
| .identityColumnSpec(column.identityColumnSpec()) | ||
| .metadata(column.metadataInJSON()) | ||
| .id(column.id()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -243,12 +227,104 @@ default GenerationExpression columnGenerationExpression() { | |
| * others. | ||
| * <p> | ||
| * This API covers top-level columns only. Nested struct fields, array elements, and map | ||
| * keys/values do not have separate IDs. Connectors that track nested field IDs can encode | ||
| * them into the returned top-level Column ID string to detect nested changes, since Spark | ||
| * only compares string equality. | ||
| * keys/values carry their own IDs in struct field metadata. Spark validates both top-level and | ||
| * nested struct field IDs as part of schema compatibility checks (array elements and map/key | ||
| * values' validation is not supported yet). See {@link StructField#id()}. | ||
| */ | ||
| @Nullable | ||
| default String id() { | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * A builder for {@link Column}. | ||
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| class Builder { | ||
| private final String name; | ||
| private DataType dataType; | ||
| private boolean nullable = true; | ||
| private String comment = null; | ||
| private ColumnDefaultValue defaultValue = null; | ||
| private GenerationExpression genExpr = null; | ||
| private IdentityColumnSpec identityColumnSpec = null; | ||
| private String metadataInJSON = null; | ||
| private String id = null; | ||
|
|
||
| private Builder(String name, DataType dataType) { | ||
| this.name = Objects.requireNonNull(name, "name must not be null"); | ||
| this.dataType = Objects.requireNonNull(dataType, "dataType must not be null"); | ||
| } | ||
|
|
||
| public Builder nullable(boolean nullable) { | ||
| this.nullable = nullable; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder comment(String comment) { | ||
| this.comment = comment; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder defaultValue(ColumnDefaultValue defaultValue) { | ||
| this.defaultValue = defaultValue; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder generationExpression(String sql) { | ||
| this.genExpr = sql != null ? new GenerationExpression(sql) : null; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder generationExpression(GenerationExpression generationExpr) { | ||
| this.genExpr = generationExpr; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) { | ||
| this.identityColumnSpec = identityColumnSpec; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder metadata(String metadataInJSON) { | ||
| this.metadataInJSON = metadataInJSON; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder id(String id) { | ||
| this.id = id; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder clearIds() { | ||
| this.id = null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it a problem if the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is in the builder where we mutate the temp state before creating |
||
| this.dataType = SchemaUtils.clearFieldIds(dataType); | ||
| return this; | ||
| } | ||
|
|
||
| public Column build() { | ||
| validateState(); | ||
| return new ColumnImpl( | ||
| name, dataType, nullable, comment, defaultValue, | ||
| genExpr, identityColumnSpec, metadataInJSON, id); | ||
| } | ||
|
|
||
| private void validateState() { | ||
| if (hasConflictingDefinitions()) { | ||
| throw new SparkIllegalArgumentException( | ||
| "INTERNAL_ERROR", | ||
| Map.of("message", | ||
| "Column '" + name + "' cannot have more than one definition of: " + | ||
| "default value, generation expression, identity column spec")); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume in reality, this error shouldn't happen, so we cannot test it right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't happen in practice. We do test this in |
||
| } | ||
| } | ||
|
|
||
| private boolean hasConflictingDefinitions() { | ||
| long definitionCount = Stream.of(defaultValue, genExpr, identityColumnSpec) | ||
| .filter(Objects::nonNull) | ||
| .count(); | ||
| return definitionCount > 1; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we have in-memory-table testings like in #55376 also where we test connector have implemented various scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What cases do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like we have ComposedColumnIdTableCatalog, sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala,.... where we mock various connector scenarios, e.g. Connector that provide top-level column ID only, no struct column ID; Connector that provides both top-level col ID and struct col ID,....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We replaced the approach in
ComposedColumnIdTableCatalog, so I am not sure it is valid to keep. The rest of the tests should be in place.