diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 81ef749e19f81..0c8bfcc11b25c 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -197,6 +197,12 @@ ], "sqlState" : "42703" }, + "AUTOCDC_EMPTY_KEYS" : { + "message" : [ + "AutoCDC requires at least one key column to identify rows, but received an empty key set." + ], + "sqlState" : "22023" + }, "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : { "message" : [ "Expected a single column identifier; got the multi-part identifier (parts: )." diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 5774781b8ab9f..c17c89967baa5 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -156,4 +156,22 @@ case class ChangeArgs( storedAsScdType: ScdType, deleteCondition: Option[Column] = None, columnSelection: Option[ColumnSelection] = None -) +) { + ChangeArgs.validateNonEmptyKeys(keys) +} + +object ChangeArgs { + /** + * Validates that [[ChangeArgs.keys]] is non-empty. Both SCD1 and SCD2 semantics require at + * least one key column to identify rows; rejecting empty key sets at construction lets + * downstream consumers rely on `keys.nonEmpty` without re-validating. + */ + private def validateNonEmptyKeys(keys: Seq[UnqualifiedColumnName]): Unit = { + if (keys.isEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_EMPTY_KEYS", + messageParameters = Map.empty + ) + } + } +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala new file mode 100644 index 0000000000000..f87a4a1da53d4 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F} +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.util.ArrayImplicits._ + +/** + * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] + * configuration. + */ +case class Scd1BatchProcessor(changeArgs: ChangeArgs) { + /** + * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key + * as ordered by [[ChangeArgs.sequencing]]. + * + * For SCD1 we only care about the most recent (by sequence value) event per key. When + * multiple events share the same key and the same sequence value, the row selected is + * non-deterministic and undefined. + * + * @param validatedMicrobatch A microbatch that has already been validated such that the + * sequencing column should not contain null values, and its data type + * should support ordering. + * + * The schema of the returned dataframe matches the schema of the microbatch exactly. + */ + def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = { + // The `max_by` API can only return a single column, so pack/unpack the entire row into a + // temporary column before and after the `max_by` operation. + val winningRowCol = Scd1BatchProcessor.winningRowColName + + val allMicrobatchColumns = + validatedMicrobatch.columns + .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) + .toImmutableArraySeq + + validatedMicrobatch + .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) + .agg( + F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) + .as(winningRowCol) + ) + .select(F.col(s"$winningRowCol.*")) + } +} + +object Scd1BatchProcessor { + // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. + private[autocdc] val winningRowColName = "__spark_autocdc_winning_row" +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 816338cb677e8..1de2120a8f915 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -362,6 +362,21 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { ) } + test("ChangeArgs rejects an empty key list") { + checkError( + exception = intercept[AnalysisException] { + ChangeArgs( + keys = Seq.empty, + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + }, + condition = "AUTOCDC_EMPTY_KEYS", + sqlState = "22023", + parameters = Map.empty + ) + } + test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") { checkError( exception = intercept[ParseException] { diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala new file mode 100644 index 0000000000000..208c0aa1e4c59 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row} +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Returns the `(name, dataType)` pairs of `schema`'s fields. Used to compare two schemas for + * structural equivalence while deliberately ignoring nullability and metadata, which can shift + * benignly when columns are unpacked from a struct. + */ + private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = + schema.fields.map(f => (f.name, f.dataType)).toSeq + + test("deduplicateMicrobatch keeps only the row with the largest sequence value per key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first"), + Row(1, 30L, "winner"), + Row(1, 20L, "middle") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 30L, "winner") + ) + } + + test("deduplicateMicrobatch is no-op if there's a single event for a key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "only-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "only-row") + ) + } + + test("deduplicateMicrobatch handles equal sequencing values for the same key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first-tied-row"), + Row(1, 10L, "second-tied-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // On equal sequence number events for the same key we provide no guarantee on which event will + // survive, but the contract is _one_ event will survive - assert that below. + val result = processor.deduplicateMicrobatch(batch).collect() + assert(result.length == 1) + assert(result.head.getInt(0) == 1) + assert(result.head.getLong(1) == 10L) + assert(Set("first-tied-row", "second-tied-row").contains(result.head.getString(2))) + } + + test("deduplicateMicrobatch ignores rows with null sequencing when a non-null value exists") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that null sequence rows are de-prioritized in + // deduplication. + Row(1, null, "null-sequence"), + Row(1, 10L, "non-null-sequence") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "non-null-sequence") + ) + } + + test( + "deduplicateMicrobatch returns a null row when all sequencing values for a key are null" + ) { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that a null row will be returned by + // deduplication if all rows contain a null sequence in the microbatch. + Row(1, null, "null-sequence") + ) + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(null, null, null) + ) + } + + test("deduplicateMicrobatch processes multiple keys independently") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a1"), + Row(2, 50L, "b1-winner"), + Row(1, 20L, "a2-winner"), + Row(2, 40L, "b2-loser"), + Row(3, 1L, "c1-only") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 20L, "a2-winner"), + Row(2, 50L, "b1-winner"), + Row(3, 1L, "c1-only") + ) + ) + } + + test("deduplicateMicrobatch carries non-key, non-sequence columns from the winning row") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("name", StringType) + .add("amount", IntegerType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old-name", 100), + Row(1, 20L, "winning-name", 200) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // All non-key columns must come from the row with the largest sequence value, never + // a mix of values from multiple rows. + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "winning-name", 200) + ) + } + + test("deduplicateMicrobatch carries nested columns correctly from the winning row") { + val payloadType = new StructType() + .add("name", StringType) + .add("amount", IntegerType) + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("payload", payloadType) + + val batch = microbatchOf(schema)( + Row(1, 10L, Row("old", 100)), + Row(1, 20L, Row("new", 200)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, Row("new", 200)) + ) + } + + test("deduplicateMicrobatch supports composite (multi-column) keys") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("US", 1, 10L, "us1-old"), + Row("US", 1, 20L, "us1-new"), + // Same customer_id as above but different region: independent group. + Row("EU", 1, 5L, "eu1-only"), + // Same region as above but different customer_id: independent group. + Row("US", 2, 99L, "us2-only") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row("US", 1, 20L, "us1-new"), + Row("EU", 1, 5L, "eu1-only"), + Row("US", 2, 99L, "us2-only") + ) + ) + } + + test("deduplicateMicrobatch supports an arbitrary sequencing expression") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("alt_seq", LongType) + .add("value", StringType) + + // The sequencing expression is a function call referencing multiple columns, not a bare + // identifier. Locks in that `max_by(..., changeArgs.sequencing)` evaluates the full + // expression per-row rather than treating `sequencing` as a single column reference. + val batch = microbatchOf(schema)( + // greatest(10, 30) = 30 - winner under the expression. + Row(1, 10L, 30L, "winner"), + // greatest(25, 20) = 25 - would win under `seq` alone, but loses under `greatest`. + Row(1, 25L, 20L, "would-win-on-seq-alone"), + Row(1, 15L, 15L, "always-loses") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.greatest(F.col("seq"), F.col("alt_seq")), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, 30L, "winner") + ) + } + + test("deduplicateMicrobatch supports literal-dot column names") { + val schema = new StructType() + .add("user.id", IntegerType) + .add("seq", LongType) + .add("event.value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old"), + Row(1, 20L, "new") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "new") + ) + } + + test( + "deduplicateMicrobatch fails when a key column collides with the reserved name" + ) { + val reservedColName = Scd1BatchProcessor.winningRowColName + + val schema = new StructType() + .add(reservedColName, StringType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("k1", 10L, "loser"), + Row("k1", 20L, "winner") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName(reservedColName)), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkError( + exception = intercept[AnalysisException] { + processor.deduplicateMicrobatch(batch).collect() + }, + condition = "AMBIGUOUS_REFERENCE", + sqlState = "42704", + parameters = Map( + "name" -> s"`$reservedColName`", + "referenceNames" -> s"[`$reservedColName`, `$reservedColName`]" + ), + context = ExpectedContext(fragment = "col", callSitePattern = "") + ) + } + + test("deduplicateMicrobatch preserves the input column names, types, and ordering") { + val schema = new StructType() + .add("a", StringType) + .add("id", IntegerType) + .add("z", DoubleType) + .add("seq", LongType) + .add("flag", BooleanType) + + val batch = microbatchOf(schema)( + Row("a1", 1, 1.5, 10L, true), + Row("a2", 1, 2.5, 20L, false) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // Field names and dataTypes must match the input exactly, in the original order. + assert( + columnNamesAndDataTypes(processor.deduplicateMicrobatch(batch).schema) == + columnNamesAndDataTypes(schema)) + } + + test("deduplicateMicrobatch returns an empty DataFrame with preserved schema") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)() + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + val result = processor.deduplicateMicrobatch(batch) + assert(result.collect().isEmpty) + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) + } +}