From ada1bdb6af60ceee0a80f32501b34441fa98c3cb Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 14 May 2026 22:18:55 +0000 Subject: [PATCH 1/9] Implement deduplicateMicrobatch --- .../autocdc/OutOfOrderCdcMergeUtils.scala | 33 +++ .../autocdc/Scd1BatchProcessor.scala | 57 +++++ .../autocdc/Scd1BatchProcessorSuite.scala | 232 ++++++++++++++++++ 3 files changed, 322 insertions(+) create mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala create mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala create mode 100644 sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala new file mode 100644 index 0000000000000..50b635c4ba2b8 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +/** Shared helpers for the out-of-order CDC merge implementations (SCD Type 1 and Type 2). */ +private[autocdc] object OutOfOrderCdcMergeUtils { + + /** + * Build a synthetic column name with a UUID suffix so it cannot collide with any user + * column. Intended for transient columns attached during merge processing (e.g. holding + * intermediate aggregation outputs, carrying per-key state through a join, etc.). + * + * Each invocation produces a fresh name, so callers should remember the returned string if + * they need to reference the same column from multiple sites within a single merge plan. + */ + def tempColName(prefix: String): String = + s"${prefix}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala new file mode 100644 index 0000000000000..c8f10bb96938b --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F} +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.util.ArrayImplicits._ + +/** + * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] + * configuration. + */ +case class Scd1BatchProcessor(changeArgs: ChangeArgs) { + + /** + * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key + * as ordered by [[ChangeArgs.sequencing]]. + * + * For SCD1 we only care about the most recent (by sequence value) event per key. When + * multiple events share the same key and the same sequence value, the row selected is + * non-deterministic and undefined. + */ + def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { + // The `max_by` API can only return a single column, so pack/unpack the entire row into a + // temporary column before and after the `max_by` operation. + val WinningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + + val allMicrobatchColumns = + microbatchDf.columns + .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) + .toImmutableArraySeq + + microbatchDf + .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) + .agg( + F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) + .as(WinningRowCol) + ) + .select(F.col(s"$WinningRowCol.*")) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala new file mode 100644 index 0000000000000..0545946935ed6 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, Row} +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Returns the `(name, dataType)` pairs of `schema`'s fields. Used to compare two schemas for + * structural equivalence while deliberately ignoring nullability and metadata, which can shift + * benignly when columns are unpacked from a struct. + */ + private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = + schema.fields.map(f => (f.name, f.dataType)).toSeq + + test("deduplicateMicrobatch keeps only the row with the largest sequence value per key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first"), + Row(1, 30L, "winner"), + Row(1, 20L, "middle") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 30L, "winner") + ) + } + + test("deduplicateMicrobatch processes multiple keys independently") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a1"), + Row(2, 50L, "b1-winner"), + Row(1, 20L, "a2-winner"), + Row(2, 40L, "b2-loser"), + Row(3, 1L, "c1-only") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 20L, "a2-winner"), + Row(2, 50L, "b1-winner"), + Row(3, 1L, "c1-only") + ) + ) + } + + test("deduplicateMicrobatch carries non-key, non-sequence columns from the winning row") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("name", StringType) + .add("amount", IntegerType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old-name", 100), + Row(1, 20L, "winning-name", 200) + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // All non-key columns must come from the row with the largest sequence value, never + // a mix of values from multiple rows. + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "winning-name", 200) + ) + } + + test("deduplicateMicrobatch supports composite (multi-column) keys") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("US", 1, 10L, "us1-old"), + Row("US", 1, 20L, "us1-new"), + // Same customer_id as above but different region: independent group. + Row("EU", 1, 5L, "eu1-only"), + // Same region as above but different customer_id: independent group. + Row("US", 2, 99L, "us2-only") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row("US", 1, 20L, "us1-new"), + Row("EU", 1, 5L, "eu1-only"), + Row("US", 2, 99L, "us2-only") + ) + ) + } + + test("deduplicateMicrobatch supports literal-dot column names") { + val schema = new StructType() + .add("user.id", IntegerType) + .add("seq", LongType) + .add("event.value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old"), + Row(1, 20L, "new") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "new") + ) + } + + test("deduplicateMicrobatch preserves the input column names, types, and ordering") { + val schema = new StructType() + .add("a", StringType) + .add("id", IntegerType) + .add("z", DoubleType) + .add("seq", LongType) + .add("flag", BooleanType) + + val batch = microbatchOf(schema)( + Row("a1", 1, 1.5, 10L, true), + Row("a2", 1, 2.5, 20L, false) + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // Field names and dataTypes must match the input exactly, in the original order. + assert( + columnNamesAndDataTypes(processor.deduplicateMicrobatch(batch).schema) == + columnNamesAndDataTypes(schema)) + } + + test("deduplicateMicrobatch returns an empty DataFrame with preserved schema") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)() + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + val result = processor.deduplicateMicrobatch(batch) + assert(result.collect().isEmpty) + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) + } +} From a1a0e7b3407158ed6ffbe94569ab08e494799dff Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 00:38:05 +0000 Subject: [PATCH 2/9] indenting cleanup --- .../autocdc/Scd1BatchProcessorSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 0545946935ed6..cb4df5e0d6c1f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -50,7 +50,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -78,7 +78,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -108,7 +108,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -140,7 +140,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -169,7 +169,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("`user.id`")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -196,7 +196,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -218,7 +218,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { val batch = microbatchOf(schema)() val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 From 434f6ad4a7a4b8a2a702e02542a772e7ddee161c Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 00:40:40 +0000 Subject: [PATCH 3/9] schema comment --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index c8f10bb96938b..024e46d585e39 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -35,6 +35,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * For SCD1 we only care about the most recent (by sequence value) event per key. When * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. + * + * The schema of the returned dataframe matches the schema of the microbatch exactly. */ def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a From 022a95c4270db207b52e66ebca383366dca8a6c4 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 20:14:14 +0000 Subject: [PATCH 4/9] casing --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 024e46d585e39..340ca087c6bac 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -41,7 +41,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. - val WinningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") val allMicrobatchColumns = microbatchDf.columns @@ -52,8 +52,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) .agg( F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) - .as(WinningRowCol) + .as(winningRowCol) ) - .select(F.col(s"$WinningRowCol.*")) + .select(F.col(s"$winningRowCol.*")) } } From f92f1e327e12918bb20917f81f9c213cc017624a Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:08:16 +0000 Subject: [PATCH 5/9] linting --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 340ca087c6bac..9beea0508b911 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -35,7 +35,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * For SCD1 we only care about the most recent (by sequence value) event per key. When * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. - * + * * The schema of the returned dataframe matches the schema of the microbatch exactly. */ def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { From 04a38f247ace2ad9530fd935d153043522b726b5 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 21:37:54 +0000 Subject: [PATCH 6/9] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 11 +- .../autocdc/Scd1BatchProcessorSuite.scala | 110 +++++++++++++++++- 2 files changed, 115 insertions(+), 6 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 9beea0508b911..edeca022a27a3 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -27,7 +27,6 @@ import org.apache.spark.util.ArrayImplicits._ * configuration. */ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { - /** * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key * as ordered by [[ChangeArgs.sequencing]]. @@ -36,19 +35,23 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. * + * @param validatedMicrobatch A microbatch that has already been validated such that the + * sequencing column should not contain null values, and its data type + * should support ordering. + * * The schema of the returned dataframe matches the schema of the microbatch exactly. */ - def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { + def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") val allMicrobatchColumns = - microbatchDf.columns + validatedMicrobatch.columns .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) .toImmutableArraySeq - microbatchDf + validatedMicrobatch .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) .agg( F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index cb4df5e0d6c1f..1125440f42bbf 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.{functions => F, Row} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { +class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = @@ -63,6 +63,112 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) } + test("deduplicateMicrobatch is no-op if there's a single event for a key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "only-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "only-row") + ) + } + + test("deduplicateMicrobatch handles equal sequencing values for the same key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first-tied-row"), + Row(1, 10L, "second-tied-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // On equal sequence number events for the same key we provide no guarantee on which event will + // survive, but the contract is _one_ event will survive - assert that below. + val result = processor.deduplicateMicrobatch(batch).collect() + assert(result.length == 1) + assert(result.head.getInt(0) == 1) + assert(result.head.getLong(1) == 10L) + assert(Set("first-tied-row", "second-tied-row").contains(result.head.getString(2))) + } + + test("deduplicateMicrobatch ignores rows with null sequencing when a non-null value exists") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that null sequence rows are de-prioritized in + // deduplication. + Row(1, null, "null-sequence"), + Row(1, 10L, "non-null-sequence") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "non-null-sequence") + ) + } + + test( + "deduplicateMicrobatch returns a null row when all sequencing values for a key are null" + ) { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that a null row will be returned by + // deduplication if all rows contain a null sequence in the microbatch. + Row(1, null, "null-sequence") + ) + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(null, null, null) + ) + } + test("deduplicateMicrobatch processes multiple keys independently") { val schema = new StructType() .add("id", IntegerType) From 19d9040b6e796e8e8dd59024bfc51ae3dd2d6b48 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 21:49:54 +0000 Subject: [PATCH 7/9] use reserved __spark_autocdc* prefix --- .../autocdc/OutOfOrderCdcMergeUtils.scala | 33 ------------------- .../autocdc/Scd1BatchProcessor.scala | 7 +++- 2 files changed, 6 insertions(+), 34 deletions(-) delete mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala deleted file mode 100644 index 50b635c4ba2b8..0000000000000 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.pipelines.autocdc - -/** Shared helpers for the out-of-order CDC merge implementations (SCD Type 1 and Type 2). */ -private[autocdc] object OutOfOrderCdcMergeUtils { - - /** - * Build a synthetic column name with a UUID suffix so it cannot collide with any user - * column. Intended for transient columns attached during merge processing (e.g. holding - * intermediate aggregation outputs, carrying per-key state through a join, etc.). - * - * Each invocation produces a fresh name, so callers should remember the returned string if - * they need to reference the same column from multiple sites within a single merge plan. - */ - def tempColName(prefix: String): String = - s"${prefix}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" -} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index edeca022a27a3..732930c93630d 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -44,7 +44,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. - val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + val winningRowCol = Scd1BatchProcessor.winningRowColName val allMicrobatchColumns = validatedMicrobatch.columns @@ -60,3 +60,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { .select(F.col(s"$winningRowCol.*")) } } + +object Scd1BatchProcessor { + // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. + private val winningRowColName = "__spark_autocdc_winning_row" +} From 1e8b86c0e33859c28a4cc5333ded52e6c77d72f0 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 23:05:58 +0000 Subject: [PATCH 8/9] Add deduplicate test when row contains nested columns --- .../autocdc/Scd1BatchProcessorSuite.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 1125440f42bbf..a82323a842deb 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -229,6 +229,34 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("deduplicateMicrobatch carries nested columns correctly from the winning row") { + val payloadType = new StructType() + .add("name", StringType) + .add("amount", IntegerType) + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("payload", payloadType) + + val batch = microbatchOf(schema)( + Row(1, 10L, Row("old", 100)), + Row(1, 20L, Row("new", 200)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, Row("new", 200)) + ) + } + test("deduplicateMicrobatch supports composite (multi-column) keys") { val schema = new StructType() .add("region", StringType) From 0b498a084f64af6c89f273b4dcc91ae3e5cae749 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 20 May 2026 20:22:27 +0000 Subject: [PATCH 9/9] PR feedback --- .../resources/error/error-conditions.json | 6 ++ .../sql/pipelines/autocdc/ChangeArgs.scala | 20 +++++- .../autocdc/Scd1BatchProcessor.scala | 2 +- .../pipelines/autocdc/ChangeArgsSuite.scala | 15 ++++ .../autocdc/Scd1BatchProcessorSuite.scala | 72 ++++++++++++++++++- 5 files changed, 111 insertions(+), 4 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 81ef749e19f81..0c8bfcc11b25c 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -197,6 +197,12 @@ ], "sqlState" : "42703" }, + "AUTOCDC_EMPTY_KEYS" : { + "message" : [ + "AutoCDC requires at least one key column to identify rows, but received an empty key set." + ], + "sqlState" : "22023" + }, "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : { "message" : [ "Expected a single column identifier; got the multi-part identifier (parts: )." diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 5774781b8ab9f..c17c89967baa5 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -156,4 +156,22 @@ case class ChangeArgs( storedAsScdType: ScdType, deleteCondition: Option[Column] = None, columnSelection: Option[ColumnSelection] = None -) +) { + ChangeArgs.validateNonEmptyKeys(keys) +} + +object ChangeArgs { + /** + * Validates that [[ChangeArgs.keys]] is non-empty. Both SCD1 and SCD2 semantics require at + * least one key column to identify rows; rejecting empty key sets at construction lets + * downstream consumers rely on `keys.nonEmpty` without re-validating. + */ + private def validateNonEmptyKeys(keys: Seq[UnqualifiedColumnName]): Unit = { + if (keys.isEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_EMPTY_KEYS", + messageParameters = Map.empty + ) + } + } +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 732930c93630d..f87a4a1da53d4 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -63,5 +63,5 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { object Scd1BatchProcessor { // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. - private val winningRowColName = "__spark_autocdc_winning_row" + private[autocdc] val winningRowColName = "__spark_autocdc_winning_row" } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 816338cb677e8..1de2120a8f915 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -362,6 +362,21 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { ) } + test("ChangeArgs rejects an empty key list") { + checkError( + exception = intercept[AnalysisException] { + ChangeArgs( + keys = Seq.empty, + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + }, + condition = "AUTOCDC_EMPTY_KEYS", + sqlState = "22023", + parameters = Map.empty + ) + } + test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") { checkError( exception = intercept[ParseException] { diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index a82323a842deb..208c0aa1e4c59 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.{functions => F, Row} +import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -291,6 +290,38 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("deduplicateMicrobatch supports an arbitrary sequencing expression") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("alt_seq", LongType) + .add("value", StringType) + + // The sequencing expression is a function call referencing multiple columns, not a bare + // identifier. Locks in that `max_by(..., changeArgs.sequencing)` evaluates the full + // expression per-row rather than treating `sequencing` as a single column reference. + val batch = microbatchOf(schema)( + // greatest(10, 30) = 30 - winner under the expression. + Row(1, 10L, 30L, "winner"), + // greatest(25, 20) = 25 - would win under `seq` alone, but loses under `greatest`. + Row(1, 25L, 20L, "would-win-on-seq-alone"), + Row(1, 15L, 15L, "always-loses") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.greatest(F.col("seq"), F.col("alt_seq")), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, 30L, "winner") + ) + } + test("deduplicateMicrobatch supports literal-dot column names") { val schema = new StructType() .add("user.id", IntegerType) @@ -316,6 +347,43 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test( + "deduplicateMicrobatch fails when a key column collides with the reserved name" + ) { + val reservedColName = Scd1BatchProcessor.winningRowColName + + val schema = new StructType() + .add(reservedColName, StringType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("k1", 10L, "loser"), + Row("k1", 20L, "winner") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName(reservedColName)), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkError( + exception = intercept[AnalysisException] { + processor.deduplicateMicrobatch(batch).collect() + }, + condition = "AMBIGUOUS_REFERENCE", + sqlState = "42704", + parameters = Map( + "name" -> s"`$reservedColName`", + "referenceNames" -> s"[`$reservedColName`, `$reservedColName`]" + ), + context = ExpectedContext(fragment = "col", callSitePattern = "") + ) + } + test("deduplicateMicrobatch preserves the input column names, types, and ordering") { val schema = new StructType() .add("a", StringType)