diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index ff34214e2ad95..87e645ef2b0f0 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3310,6 +3310,29 @@ }, "sqlState" : "42K03" }, + "INVALID_CHANGELOG_SCHEMA" : { + "message" : [ + "The Change Data Capture (CDC) schema returned by connector is invalid." + ], + "subClass" : { + "INVALID_COLUMN_TYPE" : { + "message" : [ + "Column `` has type , expected ." + ] + }, + "MISSING_COLUMN" : { + "message" : [ + "Required column `` is missing." + ] + }, + "MISSING_ROW_ID" : { + "message" : [ + "Connector advertises one or more post-processing properties (`containsCarryoverRows`, `representsUpdateAsDeleteAndInsert`, `containsIntermediateChanges`) that require row identity, but `Changelog.rowId()` returned an empty array." + ] + } + }, + "sqlState" : "42K03" + }, "INVALID_CLONE_SESSION_REQUEST" : { "message" : [ "Invalid session clone request." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 02e9f188e0fa4..f369317b4b0a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3881,6 +3881,35 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("changelogName" -> changelogName)) } + def changelogMissingColumnError( + changelogName: String, columnName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + messageParameters = Map( + "changelogName" -> changelogName, + "columnName" -> columnName)) + } + + def changelogInvalidColumnTypeError( + changelogName: String, + columnName: String, + expectedType: String, + actualType: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE", + messageParameters = Map( + "changelogName" -> changelogName, + "columnName" -> columnName, + "expectedType" -> expectedType, + "actualType" -> actualType)) + } + + def changelogMissingRowIdError(changelogName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID", + messageParameters = Map("changelogName" -> changelogName)) + } + def invalidCdcOptionConflictingRangeTypes(): Throwable = { new AnalysisException( errorClass = "INVALID_CDC_OPTION.CONFLICTING_RANGE_TYPES", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala index bb5a03f64990d..56d79c7c0da37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala @@ -22,6 +22,8 @@ import java.util.{EnumSet => JEnumSet, Set => JSet} import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Column, SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, MICRO_BATCH_READ} import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.{DataType, StringType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -36,6 +38,11 @@ case class ChangelogTable( changelogInfo: ChangelogInfo, resolved: Boolean = false) extends Table with SupportsRead { + // Validate that the connector returned a schema with the required CDC metadata columns + // and correct types. `_commit_version` is connector-defined per the Changelog contract, + // so its type is not checked. + ChangelogTable.validateSchema(changelog) + override def name: String = changelog.name override def columns: Array[Column] = changelog.columns @@ -46,3 +53,39 @@ case class ChangelogTable( override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, MICRO_BATCH_READ) } + +object ChangelogTable { + + private[v2] def validateSchema(cl: Changelog): Unit = { + val byName = cl.columns.map(c => c.name -> c).toMap + def check(name: String, expected: DataType*): Unit = { + val col = byName.getOrElse(name, + throw QueryCompilationErrors.changelogMissingColumnError(cl.name, name)) + if (expected.nonEmpty && col.dataType != expected.head) { + throw QueryCompilationErrors.changelogInvalidColumnTypeError( + cl.name, name, expected.head.sql, col.dataType.sql) + } + } + check("_change_type", StringType) + check("_commit_version") // connector-defined, any type accepted + check("_commit_timestamp", TimestampType) + + // Only call `rowId()` / `rowVersion()` when a capability requires them; a connector + // that advertises a capability without overriding the method surfaces the default + // UnsupportedOperationException directly. + val needsRowId = cl.containsCarryoverRows() || + cl.representsUpdateAsDeleteAndInsert() || + cl.containsIntermediateChanges() + if (needsRowId) { + val rowIds = cl.rowId() + if (rowIds == null || rowIds.isEmpty) { + throw QueryCompilationErrors.changelogMissingRowIdError(cl.name) + } + } + val needsRowVersion = cl.containsCarryoverRows() || + cl.representsUpdateAsDeleteAndInsert() + if (needsRowVersion) { + cl.rowVersion() + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala index 9622d23122318..c56f0e14417e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation import org.apache.spark.sql.catalyst.streaming.UserProvided import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.catalog.Changelog.{CHANGE_TYPE_DELETE, CHANGE_TYPE_INSERT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StringType} @@ -93,12 +94,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() returns change data") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "delete", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_DELETE, 2L, new Timestamp(2000L))) // DataFrame API checkAnswer( @@ -116,13 +117,13 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with open-ended version range") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API checkAnswer( @@ -157,12 +158,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() select CDC metadata columns") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "insert", 1L), - Row(2L, "delete", 2L)) + Row(1L, CHANGE_TYPE_INSERT, 1L), + Row(2L, CHANGE_TYPE_DELETE, 2L)) // DataFrame API checkAnswer( @@ -179,9 +180,9 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with projection and filter") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a2", "insert", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a2", CHANGE_TYPE_INSERT, 2L, 2000000L))) val expected = Seq(Row(1L, "a2")) @@ -200,13 +201,13 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with aggregation on change types") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row("insert", 2L), - Row("delete", 1L)) + Row(CHANGE_TYPE_INSERT, 2L), + Row(CHANGE_TYPE_DELETE, 1L)) // DataFrame API checkAnswer( @@ -223,7 +224,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("schema includes CDC metadata columns") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API val dfApi = spark.read.option("startingVersion", "1").changes(fullTableName) @@ -242,14 +243,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() version range filters correctly") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L), - makeChangeRow(4L, "d", "insert", 4L, 4000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L), + makeChangeRow(4L, "d", CHANGE_TYPE_INSERT, 4L, 4000000L))) val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API checkAnswer( @@ -269,14 +270,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() default bounds are inclusive") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API - default (both inclusive) checkAnswer( @@ -300,14 +301,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with startingBoundInclusive=false") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) // Exclusive start: version 1 excluded, versions 2 and 3 included val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L)), - Row(3L, "c", "insert", 3L, new Timestamp(3000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L)), + Row(3L, "c", CHANGE_TYPE_INSERT, 3L, new Timestamp(3000L))) // DataFrame API checkAnswer( @@ -327,14 +328,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with endingBoundInclusive=false") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) // Exclusive end: versions 1 and 2 included, version 3 excluded val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "insert", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L))) // DataFrame API checkAnswer( @@ -354,13 +355,13 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with both bounds exclusive") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 2L, 2000000L), - makeChangeRow(3L, "c", "insert", 3L, 3000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 2L, 2000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 3L, 3000000L))) // Both exclusive: only version 2 included val expected = Seq( - Row(2L, "b", "insert", 2L, new Timestamp(2000L))) + Row(2L, "b", CHANGE_TYPE_INSERT, 2L, new Timestamp(2000L))) // DataFrame API checkAnswer( @@ -383,7 +384,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() default deduplication mode is dropCarryovers") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read.option("startingVersion", "1").changes(fullTableName).collect() @@ -400,7 +401,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with deduplicationMode none") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read @@ -420,7 +421,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() passes computeUpdates to catalog") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read @@ -440,7 +441,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("changes() with timestamp range") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API spark.read @@ -475,14 +476,14 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() returns change data") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L)), - Row(2L, "b", "insert", 1L, new Timestamp(1000L)), - Row(1L, "a", "delete", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(2L, "b", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L)), + Row(1L, "a", CHANGE_TYPE_DELETE, 2L, new Timestamp(2000L))) // DataFrame API val dfApiStream = spark.readStream @@ -512,12 +513,12 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() with startingVersion filters data") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(1L, "a", "delete", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(1L, "a", CHANGE_TYPE_DELETE, 2L, 2000000L))) val expected = Seq( - Row(1L, "a", "delete", 2L, new Timestamp(2000L))) + Row(1L, "a", CHANGE_TYPE_DELETE, 2L, new Timestamp(2000L))) // DataFrame API val dfApiStream = spark.readStream @@ -547,9 +548,9 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() with projection and filter") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L), - makeChangeRow(2L, "b", "insert", 1L, 1000000L), - makeChangeRow(3L, "c", "insert", 2L, 2000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(2L, "b", CHANGE_TYPE_INSERT, 1L, 1000000L), + makeChangeRow(3L, "c", CHANGE_TYPE_INSERT, 2L, 2000000L))) val expected = Seq(Row(1L, "a"), Row(2L, "b")) @@ -586,7 +587,7 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() passes computeUpdates to catalog") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) // DataFrame API val dfApiStream = spark.readStream @@ -620,10 +621,10 @@ class ChangelogEndToEndSuite extends SharedSparkSession { test("streaming changes() supports .name() API with source evolution enabled") { catalog.addChangeRows(ident, Seq( - makeChangeRow(1L, "a", "insert", 1L, 1000000L))) + makeChangeRow(1L, "a", CHANGE_TYPE_INSERT, 1L, 1000000L))) val expected = Seq( - Row(1L, "a", "insert", 1L, new Timestamp(1000L))) + Row(1L, "a", CHANGE_TYPE_INSERT, 1L, new Timestamp(1000L))) withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { val stream = spark.readStream diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala index d403db1e62bf9..8f29df44538b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala @@ -24,10 +24,12 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.ChangelogRange -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference, Transform} +import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{LongType, StringType} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, TimestampType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Tests for the CDC (Change Data Capture) analyzer resolution path: @@ -269,4 +271,201 @@ class ChangelogResolutionSuite extends SharedSparkSession { condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", parameters = Map("changelogName" -> s"$cdcCatalogName.test_table_changelog")) } + + // =========================================================================== + // Generic changelog schema validation + // =========================================================================== + + private def stubInfo(): ChangelogInfo = new ChangelogInfo( + new ChangelogRange.VersionRange("1", java.util.Optional.of("2"), true, true), + ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS, + false) + + private def cl(name: String, cols: (String, org.apache.spark.sql.types.DataType)*) + : TestChangelog = { + new TestChangelog(name, cols.map { case (n, t) => Column.create(n, t) }.toArray) + } + + private def missing(columnName: String): Map[String, String] = + Map("changelogName" -> "bad_cl", "columnName" -> columnName) + + private def wrongType(columnName: String, expected: String, actual: String) + : Map[String, String] = Map( + "changelogName" -> "bad_cl", + "columnName" -> columnName, + "expectedType" -> expected, + "actualType" -> actual) + + // Valid metadata tuples; tests swap one of these out to create broken schemas. + private val validChangeType = "_change_type" -> StringType + private val validVersion = "_commit_version" -> LongType + private val validTimestamp = "_commit_timestamp" -> TimestampType + + test("ChangelogTable - missing _change_type column throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable(cl("bad_cl", validVersion, validTimestamp), stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + parameters = missing("_change_type")) + } + + test("ChangelogTable - missing _commit_version column throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable(cl("bad_cl", validChangeType, validTimestamp), stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + parameters = missing("_commit_version")) + } + + test("ChangelogTable - missing _commit_timestamp column throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable(cl("bad_cl", validChangeType, validVersion), stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_COLUMN", + parameters = missing("_commit_timestamp")) + } + + test("ChangelogTable - wrong _change_type data type throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable( + cl("bad_cl", "_change_type" -> IntegerType, validVersion, validTimestamp), + stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE", + parameters = wrongType("_change_type", "STRING", "INT")) + } + + test("ChangelogTable - wrong _commit_timestamp data type throws") { + checkError( + intercept[AnalysisException] { + ChangelogTable( + cl("bad_cl", validChangeType, validVersion, "_commit_timestamp" -> LongType), + stubInfo()) + }, + condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE", + parameters = wrongType("_commit_timestamp", "TIMESTAMP", "BIGINT")) + } + + test("ChangelogTable - _commit_version type is connector-defined (any type accepted)") { + Seq(IntegerType, LongType, StringType).foreach { versionType => + ChangelogTable( + cl("any_cl", validChangeType, "_commit_version" -> versionType, validTimestamp), + stubInfo()) + } + } + + test("ChangelogTable - valid schema with data columns passes") { + ChangelogTable( + cl("good_cl", "id" -> LongType, "name" -> StringType, + validChangeType, validVersion, validTimestamp), + stubInfo()) + } + + test("ChangelogTable - nested rowId and rowVersion references pass (Delta-style _metadata)") { + val metadataRowId = FieldReference(Seq("_metadata", "row_id")) + val metadataRowVersion = FieldReference(Seq("_metadata", "row_commit_version")) + val cl = new TestChangelog( + "delta_cl", + Array( + Column.create("id", LongType, false), + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + carryoverRows = true, + rowIdRefs = Array(metadataRowId), + rowVersionRef = Some(metadataRowVersion)) + ChangelogTable(cl, stubInfo()) + } + + test("ChangelogTable - representsUpdateAsDeleteAndInsert=true requires non-empty rowId") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + updateAsDeleteInsert = true, + rowIdRefs = Array.empty, + rowVersionRef = Some(FieldReference.column("_commit_version"))) + checkError( + intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID", + parameters = Map("changelogName" -> "bad_cl")) + } + + test("ChangelogTable - containsIntermediateChanges=true requires non-empty rowId") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + intermediateChanges = true, + rowIdRefs = Array.empty) + checkError( + intercept[AnalysisException] { ChangelogTable(cl, stubInfo()) }, + condition = "INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID", + parameters = Map("changelogName" -> "bad_cl")) + } + + test("ChangelogTable - UnsupportedOperationException surfaces when rowId() not implemented") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + carryoverRows = true, + rowIdSupported = false, + rowVersionRef = Some(FieldReference.column("_commit_version"))) + intercept[UnsupportedOperationException] { ChangelogTable(cl, stubInfo()) } + } + + test("ChangelogTable - UnsupportedOperationException surfaces when rowVersion() missing") { + val cl = new TestChangelog( + "bad_cl", + Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)), + carryoverRows = true, + rowIdRefs = Array(FieldReference.column("id")), + rowVersionRef = None) + intercept[UnsupportedOperationException] { ChangelogTable(cl, stubInfo()) } + } + +} + +/** + * Test-only [[Changelog]] implementation that returns a hand-crafted schema. Used to + * exercise [[ChangelogTable]]'s schema validation without going through a real catalog. + * + * Defaults match a minimal connector with no post-processing capabilities. Tests opt + * into capability flags or `rowVersion()` overrides via constructor params. + */ +private class TestChangelog( + nameArg: String, + cols: Array[Column], + carryoverRows: Boolean = false, + updateAsDeleteInsert: Boolean = false, + intermediateChanges: Boolean = false, + rowIdRefs: Array[NamedReference] = Array.empty, + rowIdSupported: Boolean = true, + rowVersionRef: Option[NamedReference] = None) extends Changelog { + override def name(): String = nameArg + override def columns(): Array[Column] = cols + override def containsCarryoverRows(): Boolean = carryoverRows + override def containsIntermediateChanges(): Boolean = intermediateChanges + override def representsUpdateAsDeleteAndInsert(): Boolean = updateAsDeleteInsert + override def rowId(): Array[NamedReference] = + if (rowIdSupported) rowIdRefs else super.rowId() + override def rowVersion(): NamedReference = + rowVersionRef.getOrElse(super.rowVersion()) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + throw new UnsupportedOperationException("not needed for schema validation tests") + } }