diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 889ecf9f7b08a..f009f947248b5 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -191,6 +191,18 @@ ], "sqlState" : "0A000" }, + "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : { + "message" : [ + "Using column name comparison, the following columns are not present in the schema: . Available columns: ." + ], + "sqlState" : "42703" + }, + "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : { + "message" : [ + "Expected a single column identifier; got the multi-part identifier (parts: )." + ], + "sqlState" : "42703" + }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { "message" : [ "Cannot write null value for field defined as non-null Avro data type .", diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala new file mode 100644 index 0000000000000..5774781b8ab9f --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.types.StructType + +/** + * A single, unqualified column identifier (no nested path or table/alias qualifier). Backticks + * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for direct schema-fieldName + * comparison and [[quoted]] for APIs that re-parse identifier strings. + */ +case class UnqualifiedColumnName private (name: String) { + def quoted: String = QuotingUtils.quoteIdentifier(name) +} + +object UnqualifiedColumnName { + def apply(input: String): UnqualifiedColumnName = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(input) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(input, nameParts) + } + new UnqualifiedColumnName(nameParts.head) + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + + case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends ColumnSelection + case class ExcludeColumns(columns: Seq[UnqualifiedColumnName]) + extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. Field order + * follows the original schema; only matching fields are retained in the returned schema. + * + * @param schemaName Logical name of the schema being filtered, surfaced in error messages + * when columns are not found (e.g. "microbatch", "target"). + * @param schema The schema to filter. + * @param columnSelection The user-provided selection. `None` is a no-op and returns `schema` + * unchanged. + * @param caseSensitive Whether to match column names case-sensitively against the schema. + * Callers should derive this from the session, e.g. + * `session.sessionState.conf.caseSensitiveAnalysis`, so column matching + * stays consistent with `spark.sql.caseSensitive`. + */ + def applyToSchema( + schemaName: String, + schema: StructType, + columnSelection: Option[ColumnSelection], + caseSensitive: Boolean): StructType = columnSelection match { + case None => + // A None column selection is interpreted as a no-op. + schema + case Some(IncludeColumns(cols)) => + val keepIndices = lookupFieldIndices(schemaName, schema, cols, caseSensitive) + StructType(schema.fields.zipWithIndex.collect { + case (field, idx) if keepIndices.contains(idx) => field + }) + case Some(ExcludeColumns(cols)) => + val dropIndices = lookupFieldIndices(schemaName, schema, cols, caseSensitive) + StructType(schema.fields.zipWithIndex.collect { + case (field, idx) if !dropIndices.contains(idx) => field + }) + } + + private def lookupFieldIndices( + schemaName: String, + schema: StructType, + fields: Seq[UnqualifiedColumnName], + caseSensitive: Boolean): Set[Int] = { + val caseAwareGetFieldIndex: String => Option[Int] = + if (caseSensitive) schema.getFieldIndex else schema.getFieldIndexCaseInsensitive + + val fieldIndexResolutions = fields.map(f => f -> caseAwareGetFieldIndex(f.name)) + val missingFieldNames = fieldIndexResolutions.collect { case (f, None) => f.name }.distinct + if (missingFieldNames.nonEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of(caseSensitive), + "schemaName" -> schemaName, + "missingColumns" -> missingFieldNames.mkString(", "), + "availableColumns" -> schema.fieldNames.mkString(", ") + ) + ) + } + fieldIndexResolutions.flatMap { case (_, idx) => idx }.toSet + } +} + +/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */ +private[autocdc] object CaseSensitivityLabels { + val CaseSensitive: String = "case-sensitive" + val CaseInsensitive: String = "case-insensitive" + + def of(caseSensitive: Boolean): String = + if (caseSensitive) CaseSensitive else CaseInsensitive +} + +/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ +sealed trait ScdType + +object ScdType { + /** Representation for the standard SCD1 strategy. */ + case object Type1 extends ScdType + /** Representation for the standard SCD2 strategy. */ + case object Type2 extends ScdType +} + +/** + * Configuration for an AutoCDC flow. + * + * @param keys The column(s) that uniquely identify a row in the source data. + * @param sequencing Expression ordering CDC events to correctly resolve out-of-order + * arrivals. Must be a sortable type. + * @param deleteCondition Expression that marks a source row as a DELETE. When None, all + * rows are treated as upserts. + * @param storedAsScdType The SCD strategy these args should be applied to. + * @param columnSelection Which source columns to select in the target table. None means + * all columns. + */ +case class ChangeArgs( + keys: Seq[UnqualifiedColumnName], + sequencing: Column, + storedAsScdType: ScdType, + deleteCondition: Option[Column] = None, + columnSelection: Option[ColumnSelection] = None +) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala new file mode 100644 index 0000000000000..816338cb677e8 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, AnalysisException, Row} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { + + private val sourceSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("Name", StringType) + .add("age", IntegerType) + + test("ColumnSelection None leaves schema unchanged") { + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = None, + caseSensitive = true + ) == sourceSchema) + } + + test("ColumnSelection IncludeColumns(Seq()) returns an empty schema") { + // An explicit empty include-list is semantically distinct from None: it means "select + // no columns" and produces an empty StructType, not the original schema. + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.IncludeColumns(Seq.empty)), + caseSensitive = true + ) == new StructType()) + } + + test("ColumnSelection ExcludeColumns(Seq()) leaves schema unchanged") { + // An empty exclude-list is a no-op: nothing to remove, so the original schema is + // returned unchanged (same observable behavior as None for this case). + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.ExcludeColumns(Seq.empty)), + caseSensitive = true + ) == sourceSchema) + } + + test("ColumnSelection IncludeColumns filters by exact name in schema order") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name")) + ) + ), + caseSensitive = true + ) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection ExcludeColumns filters by exact name") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))) + ), + caseSensitive = true + ) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection IncludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // Under caseSensitive = true, "name" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")) + ) + ), + caseSensitive = true + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", + "missingColumns" -> "name, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection ExcludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // Under caseSensitive = true, "NAME" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")) + ) + ), + caseSensitive = true + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", + "missingColumns" -> "NAME, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection IncludeColumns matches case-insensitively under caseSensitive=false") { + // "NAME" and "AGE" do not exactly match the schema fields "Name" and "age", but + // caseSensitive = false folds both sides to lowercase before comparing. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("AGE"), UnqualifiedColumnName("NAME")) + ) + ), + caseSensitive = false + ) + + // The retained fields keep their original casing from the schema, not the user's input. + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection deduplicates user-provided columns that normalize to the same name") { + // Under caseSensitive = false, "name" and "NAME" both fold to "name" and refer to the same + // schema field. The returned schema must include "Name" once, not twice. Output ordering + // and casing follow the schema, not the user's input. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("NAME")) + ) + ), + caseSensitive = false + ) + + assert(filteredSchema == new StructType().add("Name", StringType)) + } + + test("ColumnSelection ExcludeColumns matches case-insensitively under caseSensitive=false") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) + ), + caseSensitive = false + ) + + assert(filteredSchema == new StructType() + .add("id", IntegerType, nullable = false) + .add("age", IntegerType)) + } + + test("ColumnSelection missing-column error under caseSensitive=false preserves user casing") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // "NAME" matches "Name" under caseSensitive=false, but "Missing" has no schema match. + // The error message reports the user's original casing for the missing column and + // the schema's original casing for the available columns. + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("Missing")) + ) + ), + caseSensitive = false + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "schemaName" -> "test", + "missingColumns" -> "Missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("UnqualifiedColumnName accepts a simple single-part identifier") { + assert(UnqualifiedColumnName("col").name == "col") + // .quoted always wraps in back-ticks, even when the input had none. + assert(UnqualifiedColumnName("col").quoted == "`col`") + } + + test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") { + // Backticks make the dot part of a single name part, so this passes validation. The + // stored name is the parsed (unquoted) form so it matches the actual schema field name. + assert(UnqualifiedColumnName("`a.b`").name == "a.b") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`a.b`").quoted == "`a.b`") + } + + test("UnqualifiedColumnName accepts redundant backticks around a single-part name") { + // Backticks around an already-single-part identifier are decorative; the parser strips them + // so the stored name has no surrounding back-ticks. + assert(UnqualifiedColumnName("`col`").name == "col") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`col`").quoted == "`col`") + } + + test("UnqualifiedColumnName.quoted is safe to pass to functions.col for literal-dot names") { + val schema = new StructType() + .add("a.b", IntegerType) + .add("c", IntegerType) + + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, 2), Row(3, 4))), + schema + ) + + val key = UnqualifiedColumnName("`a.b`") + + // Sanity-check: the unquoted `name` is not safe to pass to `functions.col`. The string is + // re-parsed and the literal dot is interpreted as a nested-field path separator, so the + // analyzer fails to resolve `a`.`b` against the available top-level columns. + checkError( + exception = intercept[AnalysisException] { + df.select(F.col(key.name)).collect() + }, + condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + sqlState = "42703", + parameters = Map( + "objectName" -> "`a`.`b`", + "proposal" -> "`a.b`, `c`" + ), + context = ExpectedContext( + fragment = "col", + callSitePattern = "" + ) + ) + + // The `quoted` form wraps the name in back-ticks so the re-parser treats the whole thing + // as a single identifier, resolving to the top-level "a.b" column. + assert(df.select(F.col(key.quoted)).collect().toSeq == Seq(Row(1), Row(3))) + } + + test("IncludeColumns correctly matches a backtick-quoted literal-dot column") { + val schema = new StructType() + .add("a.b", IntegerType) + .add("c", StringType) + + // The user writes `a.b` to refer to the literal-dot column "a.b" in the schema. After + // construction, the [[UnqualifiedColumnName]] holds "a.b", which matches the field name + // exactly and the column is included in the filtered schema. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = schema, + columnSelection = Some( + ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`a.b`"))) + ), + caseSensitive = true + ) + + assert(filteredSchema == new StructType().add("a.b", IntegerType)) + } + + test("IncludeColumns correctly matches a backtick-quoted mixed-case column") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`Name`"))) + ), + caseSensitive = true + ) + + assert(filteredSchema == new StructType().add("Name", StringType)) + } + + test("UnqualifiedColumnName rejects a dotted (multi-part) identifier") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("a.b") + }, + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "a.b", + "nameParts" -> "a, b" + ) + ) + } + + test("UnqualifiedColumnName rejects a qualified column reference") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("src.x") + }, + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "src.x", + "nameParts" -> "src, x" + ) + ) + } + + test("UnqualifiedColumnName rejects an identifier with three or more parts") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("a.b.c") + }, + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "a.b.c", + "nameParts" -> "a, b, c" + ) + ) + } + + test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") { + checkError( + exception = intercept[ParseException] { + UnqualifiedColumnName("") + }, + condition = "PARSE_EMPTY_STATEMENT", + sqlState = Some("42617") + ) + } +}