From a76911413645864cecb8ccbd6b1ed4f5fdb18fb9 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Thu, 23 Apr 2026 09:43:37 +0000 Subject: [PATCH 1/5] [SPARK-55668][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class Validate the CDC metadata columns, row identity and row versioning returned by a `Changelog` connector at relation construction time, and introduce a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error. - `ChangelogTable.validateSchema`: fail-fast checks that the connector schema contains the required metadata columns (`_change_type`, `_commit_version`, `_commit_timestamp`), and that, when the connector advertises a capability requiring it, `rowId()` and `rowVersion()` are declared and the row version column is a non-nullable top-level column. Invoked from the `ChangelogTable` constructor. - New error class `INVALID_CHANGELOG_SCHEMA` with sub-classes `MISSING_COLUMN`, `INVALID_COLUMN_TYPE`, `MISSING_ROW_ID`, `MISSING_ROW_VERSION`, `NESTED_ROW_VERSION`, `NULLABLE_ROW_VERSION`. - `QueryCompilationErrors` helpers for each sub-class. - Tests: `ChangelogResolutionSuite` schema-validation cases using a `TestChangelog` fixture that returns hand-crafted schemas. --- .../resources/error/error-conditions.json | 38 +++++ .../sql/errors/QueryCompilationErrors.scala | 53 ++++++ .../datasources/v2/ChangelogTable.scala | 70 ++++++++ .../connector/ChangelogResolutionSuite.scala | 161 +++++++++++++++++- 4 files changed, 320 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index ff34214e2ad95..741143f370f1f 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3310,6 +3310,44 @@ }, "sqlState" : "42K03" }, + "INVALID_CHANGELOG_SCHEMA" : { + "message" : [ + "The Change Data Capture (CDC) schema returned by connector is invalid." + ], + "subClass" : { + "INVALID_COLUMN_TYPE" : { + "message" : [ + "Column `` has type , expected ." + ] + }, + "MISSING_COLUMN" : { + "message" : [ + "Required column `` is missing." + ] + }, + "MISSING_ROW_ID" : { + "message" : [ + "Connector advertises one or more post-processing properties (`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`, `containsIntermediateChanges`) that require row identity, but `Changelog.rowId()` returned an empty array. Either set all three to `false`, or return at least one row-id `NamedReference`." + ] + }, + "MISSING_ROW_VERSION" : { + "message" : [ + "Connector advertises `containsCarryoverRows` or `representsUpdateAsDeleteAndInsert` is `true`, but `Changelog.rowVersion()` is not implemented. Override `rowVersion()` to return a `NamedReference` pointing to a non-nullable column in `Changelog.columns()`." + ] + }, + "NESTED_ROW_VERSION" : { + "message" : [ + "Row version `` references a nested column. Spark requires `Changelog.rowVersion()` to point to a top-level column of `Changelog.columns()`." + ] + }, + "NULLABLE_ROW_VERSION" : { + "message" : [ + "The row version column `` is nullable. Spark requires a non-nullable row version column when `containsCarryoverRows` or `representsUpdateAsDeleteAndInsert` is `true`, to safely distinguish copy-on-write carry-over pairs from real updates. Mark the column non-nullable in `Changelog.columns()`." + ] + } + }, + "sqlState" : "42K03" + }, "INVALID_CLONE_SESSION_REQUEST" : { "message" : [ "Invalid session clone request." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 02e9f188e0fa4..c730eaac785cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3881,6 +3881,59 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("changelogName" -> changelogName)) } + def changelogMissingColumnError( + changelogName: String, columnName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + messageParameters = Map( + "changelogName" -> changelogName, + "columnName" -> columnName)) + } + + def changelogInvalidColumnTypeError( + changelogName: String, + columnName: String, + expectedType: String, + actualType: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE", + messageParameters = Map( + "changelogName" -> changelogName, + "columnName" -> columnName, + "expectedType" -> expectedType, + "actualType" -> actualType)) + } + + def changelogNullableRowVersionError( + changelogName: String, columnName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.NULLABLE_ROW_VERSION", + messageParameters = Map( + "changelogName" -> changelogName, + "columnName" -> columnName)) + } + + def changelogMissingRowIdError(changelogName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID", + messageParameters = Map("changelogName" -> changelogName)) + } + + def changelogMissingRowVersionError(changelogName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_VERSION", + messageParameters = Map("changelogName" -> changelogName)) + } + + def changelogNestedRowVersionError( + changelogName: String, reference: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.NESTED_ROW_VERSION", + messageParameters = Map( + "changelogName" -> changelogName, + "reference" -> reference)) + } + def invalidCdcOptionConflictingRangeTypes(): Throwable = { new AnalysisException( errorClass = "INVALID_CDC_OPTION.CONFLICTING_RANGE_TYPES", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala index bb5a03f64990d..bf49523655e73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala @@ -21,7 +21,10 @@ import java.util.{EnumSet => JEnumSet, Set => JSet} import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Column, SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, MICRO_BATCH_READ} +import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.{DataType, StringType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -36,6 +39,11 @@ case class ChangelogTable( changelogInfo: ChangelogInfo, resolved: Boolean = false) extends Table with SupportsRead { + // Validate the connector returned a schema with the required CDC metadata columns + // and correct types. `_commit_version` is connector-defined per the Changelog contract, + // so its type is not checked. + ChangelogTable.validateSchema(changelog) + override def name: String = changelog.name override def columns: Array[Column] = changelog.columns @@ -46,3 +54,65 @@ case class ChangelogTable( override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, MICRO_BATCH_READ) } + +object ChangelogTable { + + def validateSchema(cl: Changelog): Unit = { + val byName = cl.columns.map(c => c.name -> c).toMap + def check(name: String, expected: DataType*): Unit = { + val col = byName.getOrElse(name, + throw QueryCompilationErrors.changelogMissingColumnError(cl.name, name)) + if (expected.nonEmpty && col.dataType != expected.head) { + throw QueryCompilationErrors.changelogInvalidColumnTypeError( + cl.name, name, expected.head.sql, col.dataType.sql) + } + } + check("_change_type", StringType) + check("_commit_version") // connector-defined, any type accepted + check("_commit_timestamp", TimestampType) + + // `rowId()` / `rowVersion()` default to throwing UnsupportedOperationException for + // connectors that haven't opted in. Translate that into "not declared" so we can + // reason about it as Option/empty-array below. + val rowIds: Array[NamedReference] = try cl.rowId() catch { + case _: UnsupportedOperationException => Array.empty + } + val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) catch { + case _: UnsupportedOperationException => None + } + + // Capability-driven presence checks: a connector that advertises a capability which + // requires row identity or row versioning must actually expose those references. + // Otherwise post-processing would crash with an UnsupportedOperationException at + // runtime instead of producing a clean AnalysisException here. + val needsRowId = cl.containsCarryoverRows() || + cl.representsUpdateAsDeleteAndInsert() || + cl.containsIntermediateChanges() + if (needsRowId && (rowIds == null || rowIds.isEmpty)) { + throw QueryCompilationErrors.changelogMissingRowIdError(cl.name) + } + + val needsRowVersion = cl.containsCarryoverRows() || + cl.representsUpdateAsDeleteAndInsert() + if (needsRowVersion && rowVersionRef.isEmpty) { + throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name) + } + + // Schema constraints on rowVersion: must be a top-level non-nullable column. + // Nullable rowVersions break carry-over detection (NULL = NULL is unknown, so a + // delete+insert pair would be misclassified as a real update). + rowVersionRef.foreach { ref => + val fieldNames = ref.fieldNames() + if (fieldNames.length != 1) { + throw QueryCompilationErrors.changelogNestedRowVersionError( + cl.name, fieldNames.mkString(".")) + } + val columnName = fieldNames(0) + val col = byName.getOrElse(columnName, + throw QueryCompilationErrors.changelogMissingColumnError(cl.name, columnName)) + if (col.nullable) { + throw QueryCompilationErrors.changelogNullableRowVersionError(cl.name, columnName) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala index d403db1e62bf9..46d9c72f80e81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala @@ -24,10 +24,12 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.ChangelogRange -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference, Transform} +import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{LongType, StringType} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, TimestampType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Tests for the CDC (Change Data Capture) analyzer resolution path: @@ -269,4 +271,159 @@ class ChangelogResolutionSuite extends SharedSparkSession { condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", parameters = Map("changelogName" -> s"$cdcCatalogName.test_table_changelog")) } + + // =========================================================================== + // Generic changelog schema validation + // =========================================================================== + + private def stubInfo(): ChangelogInfo = new ChangelogInfo( + new ChangelogRange.VersionRange("1", java.util.Optional.of("2"), true, true), + ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS, + false) + + private def cl(name: String, cols: (String, org.apache.spark.sql.types.DataType)*) + : TestChangelog = { + new TestChangelog(name, cols.map { case (n, t) => Column.create(n, t) }.toArray) + } + + private def missing(columnName: String): Map[String, String] = + Map("changelogName" -> "bad_cl", "columnName" -> columnName) + + private def wrongType(columnName: String, expected: String, actual: String) + : Map[String, String] = Map( + "changelogName" -> "bad_cl", + "columnName" -> columnName, + "expectedType" -> expected, + "actualType" -> actual) + + // Valid metadata tuples; tests swap one of these out to create broken schemas. + private val validChangeType = "_change_type" -> StringType + private val validVersion = "_commit_version" -> LongType + private val validTimestamp = "_commit_timestamp" -> TimestampType + + test("ChangelogTable - missing _change_type column throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable(cl("bad_cl", validVersion, validTimestamp), stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + parameters = missing("_change_type")) + } + + test("ChangelogTable - missing _commit_version column throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable(cl("bad_cl", validChangeType, validTimestamp), stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + parameters = missing("_commit_version")) + } + + test("ChangelogTable - missing _commit_timestamp column throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable(cl("bad_cl", validChangeType, validVersion), stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + parameters = missing("_commit_timestamp")) + } + + test("ChangelogTable - wrong _change_type data type throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable( + cl("bad_cl", "_change_type" -> IntegerType, validVersion, validTimestamp), + stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE", + parameters = wrongType("_change_type", "STRING", "INT")) + } + + test("ChangelogTable - wrong _commit_timestamp data type throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable( + cl("bad_cl", validChangeType, validVersion, "_commit_timestamp" -> LongType), + stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE", + parameters = wrongType("_commit_timestamp", "TIMESTAMP", "BIGINT")) + } + + test("ChangelogTable - _commit_version type is connector-defined (any type accepted)") { + Seq(IntegerType, LongType, StringType).foreach { versionType => + ChangelogTable( + cl("any_cl", validChangeType, "_commit_version" -> versionType, validTimestamp), + stubInfo()) + } + } + + test("ChangelogTable - valid schema with data columns passes") { + ChangelogTable( + cl("good_cl", "id" -> LongType, "name" -> StringType, + validChangeType, validVersion, validTimestamp), + stubInfo()) + } + + test("ChangelogTable - nullable rowVersion column fails") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("id", LongType, false), + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType), + Column.create("row_commit_version", LongType)), + carryoverRows = true, + rowIdRefs = Array(FieldReference.column("id")), + rowVersionRef = Some(FieldReference.column("row_commit_version"))) + checkError( + intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) }, + condition = "INVALID_CHANGELOG_SCHEMA.NULLABLE_ROW_VERSION", + parameters = Map( + "changelogName" -> "bad_cl", + "columnName" -> "row_commit_version")) + } + + test("ChangelogTable - non-nullable rowVersion column passes") { + val cl = new TestChangelog( + "good_cl", + Array( + Column.create("id", LongType, false), + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType), + Column.create("row_commit_version", LongType, false)), + carryoverRows = true, + rowIdRefs = Array(FieldReference.column("id")), + rowVersionRef = Some(FieldReference.column("row_commit_version"))) + ChangelogTable(cl, stubInfo()) + } + +} + +/** + * Test-only [[Changelog]] implementation that returns a hand-crafted schema. Used to + * exercise [[ChangelogTable]]'s schema validation without going through a real catalog. + * + * Defaults match a minimal connector with no post-processing capabilities. Tests opt + * into capability flags or `rowVersion()` overrides via constructor params. + */ +private class TestChangelog( + nameArg: String, + cols: Array[Column], + carryoverRows: Boolean = false, + rowIdRefs: Array[NamedReference] = Array.empty, + rowVersionRef: Option[NamedReference] = None) extends Changelog { + override def name(): String = nameArg + override def columns(): Array[Column] = cols + override def containsCarryoverRows(): Boolean = carryoverRows + override def containsIntermediateChanges(): Boolean = false + override def representsUpdateAsDeleteAndInsert(): Boolean = false + override def rowId(): Array[NamedReference] = rowIdRefs + override def rowVersion(): NamedReference = + rowVersionRef.getOrElse(super.rowVersion()) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + throw new UnsupportedOperationException("not needed for schema validation tests") + } } From b5c012f135bd85184a8edb615a56d47540dbc65c Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Fri, 24 Apr 2026 08:21:17 +0000 Subject: [PATCH 2/5] Re-enabled nested columns support --- .../resources/error/error-conditions.json | 10 ------ .../sql/errors/QueryCompilationErrors.scala | 18 ---------- .../datasources/v2/ChangelogTable.scala | 17 ---------- .../connector/ChangelogResolutionSuite.scala | 33 ++++--------------- 4 files changed, 7 insertions(+), 71 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 741143f370f1f..27ce68f11cbac 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3334,16 +3334,6 @@ "message" : [ "Connector advertises `containsCarryoverRows` or `representsUpdateAsDeleteAndInsert` is `true`, but `Changelog.rowVersion()` is not implemented. Override `rowVersion()` to return a `NamedReference` pointing to a non-nullable column in `Changelog.columns()`." ] - }, - "NESTED_ROW_VERSION" : { - "message" : [ - "Row version `` references a nested column. Spark requires `Changelog.rowVersion()` to point to a top-level column of `Changelog.columns()`." - ] - }, - "NULLABLE_ROW_VERSION" : { - "message" : [ - "The row version column `` is nullable. Spark requires a non-nullable row version column when `containsCarryoverRows` or `representsUpdateAsDeleteAndInsert` is `true`, to safely distinguish copy-on-write carry-over pairs from real updates. Mark the column non-nullable in `Changelog.columns()`." - ] } }, "sqlState" : "42K03" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index c730eaac785cd..3dbacbe67b454 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3904,15 +3904,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "actualType" -> actualType)) } - def changelogNullableRowVersionError( - changelogName: String, columnName: String): AnalysisException = { - new AnalysisException( - errorClass = "INVALID_CHANGELOG_SCHEMA.NULLABLE_ROW_VERSION", - messageParameters = Map( - "changelogName" -> changelogName, - "columnName" -> columnName)) - } - def changelogMissingRowIdError(changelogName: String): AnalysisException = { new AnalysisException( errorClass = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID", @@ -3925,15 +3916,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("changelogName" -> changelogName)) } - def changelogNestedRowVersionError( - changelogName: String, reference: String): AnalysisException = { - new AnalysisException( - errorClass = "INVALID_CHANGELOG_SCHEMA.NESTED_ROW_VERSION", - messageParameters = Map( - "changelogName" -> changelogName, - "reference" -> reference)) - } - def invalidCdcOptionConflictingRangeTypes(): Throwable = { new AnalysisException( errorClass = "INVALID_CDC_OPTION.CONFLICTING_RANGE_TYPES", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala index bf49523655e73..d756da476c770 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala @@ -97,22 +97,5 @@ object ChangelogTable { if (needsRowVersion && rowVersionRef.isEmpty) { throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name) } - - // Schema constraints on rowVersion: must be a top-level non-nullable column. - // Nullable rowVersions break carry-over detection (NULL = NULL is unknown, so a - // delete+insert pair would be misclassified as a real update). - rowVersionRef.foreach { ref => - val fieldNames = ref.fieldNames() - if (fieldNames.length != 1) { - throw QueryCompilationErrors.changelogNestedRowVersionError( - cl.name, fieldNames.mkString(".")) - } - val columnName = fieldNames(0) - val col = byName.getOrElse(columnName, - throw QueryCompilationErrors.changelogMissingColumnError(cl.name, columnName)) - if (col.nullable) { - throw QueryCompilationErrors.changelogNullableRowVersionError(cl.name, columnName) - } - } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala index 46d9c72f80e81..f79784c04bd68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala @@ -365,38 +365,19 @@ class ChangelogResolutionSuite extends SharedSparkSession { stubInfo()) } - test("ChangelogTable - nullable rowVersion column fails") { + test("ChangelogTable - nested rowId and rowVersion references pass (Delta-style _metadata)") { + val metadataRowId = FieldReference(Seq("_metadata", "row_id")) + val metadataRowVersion = FieldReference(Seq("_metadata", "row_commit_version")) val cl = new TestChangelog( - "bad_cl", + "delta_cl", Array( Column.create("id", LongType, false), Column.create("_change_type", StringType), Column.create("_commit_version", LongType), - Column.create("_commit_timestamp", TimestampType), - Column.create("row_commit_version", LongType)), + Column.create("_commit_timestamp", TimestampType)), carryoverRows = true, - rowIdRefs = Array(FieldReference.column("id")), - rowVersionRef = Some(FieldReference.column("row_commit_version"))) - checkError( - intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) }, - condition = "INVALID_CHANGELOG_SCHEMA.NULLABLE_ROW_VERSION", - parameters = Map( - "changelogName" -> "bad_cl", - "columnName" -> "row_commit_version")) - } - - test("ChangelogTable - non-nullable rowVersion column passes") { - val cl = new TestChangelog( - "good_cl", - Array( - Column.create("id", LongType, false), - Column.create("_change_type", StringType), - Column.create("_commit_version", LongType), - Column.create("_commit_timestamp", TimestampType), - Column.create("row_commit_version", LongType, false)), - carryoverRows = true, - rowIdRefs = Array(FieldReference.column("id")), - rowVersionRef = Some(FieldReference.column("row_commit_version"))) + rowIdRefs = Array(metadataRowId), + rowVersionRef = Some(metadataRowVersion)) ChangelogTable(cl, stubInfo()) } From b961af544e080dd9c39b22490482c2f85639dd1f Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Fri, 24 Apr 2026 08:40:48 +0000 Subject: [PATCH 3/5] PR feedback --- .../resources/error/error-conditions.json | 7 +- .../sql/errors/QueryCompilationErrors.scala | 6 -- .../datasources/v2/ChangelogTable.scala | 34 ++++------ .../connector/ChangelogResolutionSuite.scala | 67 ++++++++++++++++++- 4 files changed, 77 insertions(+), 37 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 27ce68f11cbac..87e645ef2b0f0 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3327,12 +3327,7 @@ }, "MISSING_ROW_ID" : { "message" : [ - "Connector advertises one or more post-processing properties (`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`, `containsIntermediateChanges`) that require row identity, but `Changelog.rowId()` returned an empty array. Either set all three to `false`, or return at least one row-id `NamedReference`." - ] - }, - "MISSING_ROW_VERSION" : { - "message" : [ - "Connector advertises `containsCarryoverRows` or `representsUpdateAsDeleteAndInsert` is `true`, but `Changelog.rowVersion()` is not implemented. Override `rowVersion()` to return a `NamedReference` pointing to a non-nullable column in `Changelog.columns()`." + "Connector advertises one or more post-processing properties (`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`, `containsIntermediateChanges`) that require row identity, but `Changelog.rowId()` returned an empty array." ] } }, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 3dbacbe67b454..f369317b4b0a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3910,12 +3910,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("changelogName" -> changelogName)) } - def changelogMissingRowVersionError(changelogName: String): AnalysisException = { - new AnalysisException( - errorClass = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_VERSION", - messageParameters = Map("changelogName" -> changelogName)) - } - def invalidCdcOptionConflictingRangeTypes(): Throwable = { new AnalysisException( errorClass = "INVALID_CDC_OPTION.CONFLICTING_RANGE_TYPES", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala index d756da476c770..56d79c7c0da37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala @@ -21,7 +21,6 @@ import java.util.{EnumSet => JEnumSet, Set => JSet} import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Column, SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, MICRO_BATCH_READ} -import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{DataType, StringType, TimestampType} @@ -39,7 +38,7 @@ case class ChangelogTable( changelogInfo: ChangelogInfo, resolved: Boolean = false) extends Table with SupportsRead { - // Validate the connector returned a schema with the required CDC metadata columns + // Validate that the connector returned a schema with the required CDC metadata columns // and correct types. `_commit_version` is connector-defined per the Changelog contract, // so its type is not checked. ChangelogTable.validateSchema(changelog) @@ -57,7 +56,7 @@ case class ChangelogTable( object ChangelogTable { - def validateSchema(cl: Changelog): Unit = { + private[v2] def validateSchema(cl: Changelog): Unit = { val byName = cl.columns.map(c => c.name -> c).toMap def check(name: String, expected: DataType*): Unit = { val col = byName.getOrElse(name, @@ -71,31 +70,22 @@ object ChangelogTable { check("_commit_version") // connector-defined, any type accepted check("_commit_timestamp", TimestampType) - // `rowId()` / `rowVersion()` default to throwing UnsupportedOperationException for - // connectors that haven't opted in. Translate that into "not declared" so we can - // reason about it as Option/empty-array below. - val rowIds: Array[NamedReference] = try cl.rowId() catch { - case _: UnsupportedOperationException => Array.empty - } - val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) catch { - case _: UnsupportedOperationException => None - } - - // Capability-driven presence checks: a connector that advertises a capability which - // requires row identity or row versioning must actually expose those references. - // Otherwise post-processing would crash with an UnsupportedOperationException at - // runtime instead of producing a clean AnalysisException here. + // Only call `rowId()` / `rowVersion()` when a capability requires them; a connector + // that advertises a capability without overriding the method surfaces the default + // UnsupportedOperationException directly. val needsRowId = cl.containsCarryoverRows() || cl.representsUpdateAsDeleteAndInsert() || cl.containsIntermediateChanges() - if (needsRowId && (rowIds == null || rowIds.isEmpty)) { - throw QueryCompilationErrors.changelogMissingRowIdError(cl.name) + if (needsRowId) { + val rowIds = cl.rowId() + if (rowIds == null || rowIds.isEmpty) { + throw QueryCompilationErrors.changelogMissingRowIdError(cl.name) + } } - val needsRowVersion = cl.containsCarryoverRows() || cl.representsUpdateAsDeleteAndInsert() - if (needsRowVersion && rowVersionRef.isEmpty) { - throw QueryCompilationErrors.changelogMissingRowVersionError(cl.name) + if (needsRowVersion) { + cl.rowVersion() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala index f79784c04bd68..8f29df44538b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala @@ -381,6 +381,63 @@ class ChangelogResolutionSuite extends SharedSparkSession { ChangelogTable(cl, stubInfo()) } + test("ChangelogTable - representsUpdateAsDeleteAndInsert=true requires non-empty rowId") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + updateAsDeleteInsert = true, + rowIdRefs = Array.empty, + rowVersionRef = Some(FieldReference.column("_commit_version"))) + checkError( + intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID", + parameters = Map("changelogName" -> "bad_cl")) + } + + test("ChangelogTable - containsIntermediateChanges=true requires non-empty rowId") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + intermediateChanges = true, + rowIdRefs = Array.empty) + checkError( + intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID", + parameters = Map("changelogName" -> "bad_cl")) + } + + test("ChangelogTable - UnsupportedOperationException surfaces when rowId() not implemented") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + carryoverRows = true, + rowIdSupported = false, + rowVersionRef = Some(FieldReference.column("_commit_version"))) + intercept[UnsupportedOperationException] { ChangelogTable(cl, stubInfo()) } + } + + test("ChangelogTable - UnsupportedOperationException surfaces when rowVersion() missing") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + carryoverRows = true, + rowIdRefs = Array(FieldReference.column("id")), + rowVersionRef = None) + intercept[UnsupportedOperationException] { ChangelogTable(cl, stubInfo()) } + } + } /** @@ -394,14 +451,18 @@ private class TestChangelog( nameArg: String, cols: Array[Column], carryoverRows: Boolean = false, + updateAsDeleteInsert: Boolean = false, + intermediateChanges: Boolean = false, rowIdRefs: Array[NamedReference] = Array.empty, + rowIdSupported: Boolean = true, rowVersionRef: Option[NamedReference] = None) extends Changelog { override def name(): String = nameArg override def columns(): Array[Column] = cols override def containsCarryoverRows(): Boolean = carryoverRows - override def containsIntermediateChanges(): Boolean = false - override def representsUpdateAsDeleteAndInsert(): Boolean = false - override def rowId(): Array[NamedReference] = rowIdRefs + override def containsIntermediateChanges(): Boolean = intermediateChanges + override def representsUpdateAsDeleteAndInsert(): Boolean = updateAsDeleteInsert + override def rowId(): Array[NamedReference] = + if (rowIdSupported) rowIdRefs else super.rowId() override def rowVersion(): NamedReference = rowVersionRef.getOrElse(super.rowVersion()) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { From fe53ca0b35ddc79414d71a1cb266c606e8029dfa Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Mon, 27 Apr 2026 09:16:44 +0000 Subject: [PATCH 4/5] Added Changetype constants from #55508 PR feedback --- .../connector/ChangelogEndToEndSuite.scala | 137 +++++++++--------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala index 9622d23122318..c56f0e14417e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation import org.apache.spark.sql.catalyst.streaming.UserProvided import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.catalog.Changelog.{CHANGE_TYPE_DELETE, CHANGE_TYPE_INSERT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StringType} @@ -93,12 +94,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() returns change data") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "delete", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_DELETE, 2L, new Timestamp(2000L))) // DataFrame API checkAnswer( @@ -116,13 +117,13 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with open-ended version range") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API checkAnswer( @@ -157,12 +158,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() select CDC metadata columns") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "insert", 1L), - Row(2L, "delete", 2L)) + Row(1L, CHANGE_TYPE_INSERT, 1L), + Row(2L, CHANGE_TYPE_DELETE, 2L)) // DataFrame API checkAnswer( @@ -179,9 +180,9 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with projection and filter") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a2", "insert", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a2", CHANGE_TYPE_INSERT, 2L, 2000000L))) val expected = Seq(Row(1L, "a2")) @@ -200,13 +201,13 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with aggregation on change types") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row("insert", 2L), - Row("delete", 1L)) + Row(CHANGE_TYPE_INSERT, 2L), + Row(CHANGE_TYPE_DELETE, 1L)) // DataFrame API checkAnswer( @@ -223,7 +224,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("schema includes CDC metadata columns") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API val dfApi = spark.read.option("startingVersion", "1").changes(fullTableName) @@ -242,14 +243,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() version range filters correctly") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L), - makeChangeRow(4L, "d", "insert", 4L, 4000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L), + makeChangeRow(4L, "d", CHANGE_TYPE_INSERT, 4L, 4000000L))) val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API checkAnswer( @@ -269,14 +270,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() default bounds are inclusive") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API - default (both inclusive) checkAnswer( @@ -300,14 +301,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with startingBoundInclusive=false") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) // Exclusive start: version 1 excluded, versions 2 and 3 included val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API checkAnswer( @@ -327,14 +328,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with endingBoundInclusive=false") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) // Exclusive end: versions 1 and 2 included, version 3 excluded val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "insert", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L))) // DataFrame API checkAnswer( @@ -354,13 +355,13 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with both bounds exclusive") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) // Both exclusive: only version 2 included val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L))) // DataFrame API checkAnswer( @@ -383,7 +384,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() default deduplication mode is dropCarryovers") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read.option("startingVersion", "1").changes(fullTableName).collect() @@ -400,7 +401,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with deduplicationMode none") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read @@ -420,7 +421,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() passes computeUpdates to catalog") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read @@ -440,7 +441,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with timestamp range") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read @@ -475,14 +476,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() returns change data") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "insert", 1L, new Timestamp(1000L)), - Row(1L, "a", "delete", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(1L, "a", CHANGE_TYPE_DELETE, 2L, new Timestamp(2000L))) // DataFrame API val dfApiStream = spark.readStream @@ -512,12 +513,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() with startingVersion filters data") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "a", "delete", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_DELETE, 2L, new Timestamp(2000L))) // DataFrame API val dfApiStream = spark.readStream @@ -547,9 +548,9 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() with projection and filter") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(3L, "c", "insert", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 2L, 2000000L))) val expected = Seq(Row(1L, "a"), Row(2L, "b")) @@ -586,7 +587,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() passes computeUpdates to catalog") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API val dfApiStream = spark.readStream @@ -620,10 +621,10 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() supports .name() API with source evolution enabled") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L))) withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { val stream = spark.readStream From a33c9317d6ade0857399c06280c28f969ab5b9bc Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 27 Apr 2026 21:17:50 -0700 Subject: [PATCH 5/5] fix test failure --- .../explain-results/streaming_changes_API_with_options.explain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain index 6f12567607ac0..ed2e7e7190fb1 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_changes_API_with_options.explain @@ -1,2 +1,2 @@ ~SubqueryAlias primary.tempdb.myStreamingTable -+- ~StreamingRelationV2 primary.tempdb.myStreamingTable_changelog, ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogInfo{range=VersionRange[startingVersion=1, endingVersion=Optional.empty, startingBoundInclusive=true, endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS, computeUpdates=false}), [startingVersion=1, deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0], org.apache.spark.sql.connector.catalog.InMemoryChangelogCatalog, tempdb.myStreamingTable, name= ++- ~StreamingRelationV2 primary.tempdb.myStreamingTable_changelog, ChangelogTable(org.apache.spark.sql.connector.catalog.InMemoryChangelog,ChangelogInfo{range=VersionRange[startingVersion=1, endingVersion=Optional.empty, startingBoundInclusive=true, endingBoundInclusive=true], deduplicationMode=DROP_CARRYOVERS, computeUpdates=false},false), [startingVersion=1, deduplicationMode=dropCarryovers], [id#0L, _change_type#0, _commit_version#0L, _commit_timestamp#0], org.apache.spark.sql.connector.catalog.InMemoryChangelogCatalog, tempdb.myStreamingTable, name=