Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@
],
"sqlState" : "42703"
},
"AUTOCDC_EMPTY_KEYS" : {
"message" : [
"AutoCDC requires at least one key column to identify rows, but received an empty key set."
],
"sqlState" : "22023"
},
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier <columnName> (parts: <nameParts>)."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,22 @@ case class ChangeArgs(
storedAsScdType: ScdType,
deleteCondition: Option[Column] = None,
columnSelection: Option[ColumnSelection] = None
)
) {
ChangeArgs.validateNonEmptyKeys(keys)
}

object ChangeArgs {
/**
* Validates that [[ChangeArgs.keys]] is non-empty. Both SCD1 and SCD2 semantics require at
* least one key column to identify rows; rejecting empty key sets at construction lets
* downstream consumers rely on `keys.nonEmpty` without re-validating.
*/
private def validateNonEmptyKeys(keys: Seq[UnqualifiedColumnName]): Unit = {
if (keys.isEmpty) {
throw new AnalysisException(
errorClass = "AUTOCDC_EMPTY_KEYS",
messageParameters = Map.empty
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.pipelines.autocdc

import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.util.ArrayImplicits._

/**
* Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]]
* configuration.
*/
case class Scd1BatchProcessor(changeArgs: ChangeArgs) {
/**
* Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key
* as ordered by [[ChangeArgs.sequencing]].
*
* For SCD1 we only care about the most recent (by sequence value) event per key. When
* multiple events share the same key and the same sequence value, the row selected is
* non-deterministic and undefined.
*
* @param validatedMicrobatch A microbatch that has already been validated such that the
* sequencing column should not contain null values, and its data type
* should support ordering.
*
* The schema of the returned dataframe matches the schema of the microbatch exactly.
*/
def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = {
// The `max_by` API can only return a single column, so pack/unpack the entire row into a
// temporary column before and after the `max_by` operation.
val winningRowCol = Scd1BatchProcessor.winningRowColName

val allMicrobatchColumns =
validatedMicrobatch.columns
.map(colName => F.col(QuotingUtils.quoteIdentifier(colName)))
.toImmutableArraySeq

validatedMicrobatch
.groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*)
Comment thread
AnishMahto marked this conversation as resolved.
.agg(
F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing)
.as(winningRowCol)
)
.select(F.col(s"$winningRowCol.*"))
}
}

object Scd1BatchProcessor {
// Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing.
private[autocdc] val winningRowColName = "__spark_autocdc_winning_row"
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession {
)
}

test("ChangeArgs rejects an empty key list") {
checkError(
exception = intercept[AnalysisException] {
ChangeArgs(
keys = Seq.empty,
sequencing = F.col("seq"),
storedAsScdType = ScdType.Type1
)
},
condition = "AUTOCDC_EMPTY_KEYS",
sqlState = "22023",
parameters = Map.empty
)
}

test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") {
checkError(
exception = intercept[ParseException] {
Expand Down
Loading