diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 9c9a657bc6e9f..9d84d638e6567 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -256,6 +256,12 @@ ], "sqlState" : "0A000" }, + "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE" : { + "message" : [ + "Cannot start AutoCDC flow: the target table (format: ) does not support row-level operations. AutoCDC requires a target backed by a connector that supports MERGE." + ], + "sqlState" : "0A000" + }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { "message" : [ "Cannot write null value for field defined as non-null Avro data type .", diff --git a/python/pyspark/pipelines/api.py b/python/pyspark/pipelines/api.py index 578b28ec3793d..084547f4c2b19 100644 --- a/python/pyspark/pipelines/api.py +++ b/python/pyspark/pipelines/api.py @@ -556,6 +556,11 @@ def create_auto_cdc_flow( Note that for keys, sequence_by, column_list, and except_column_list the arguments have to be column identifiers without qualifiers, e.g. they cannot be col("sourceTable.keyId"). + The set and types of `keys` are part of the Auto CDC flow's persisted state. Changing keys + across incremental runs (renaming, swapping, growing, shrinking, or changing the type of a + key column) is not supported and will produce undefined behavior. To change the key set, + fully refresh the target table. + :param target: The name of the target table that receives the Auto CDC flow. :param source: The name of the CDC source to stream from. :param keys: The column or combination of columns that uniquely identify a row in the source \ diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala new file mode 100644 index 0000000000000..2b0f8e293e76b --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +/** + * Names that AutoCDC reserves for its own use, both for internal columns it inserts during + * reconciliation (e.g. `${prefix}metadata`, `${prefix}winning_row`) and for internal tables it + * manages alongside user-defined targets (e.g. the per-target auxiliary state table). + * + * A single recognizable prefix gives a single auditable answer to "what does AutoCDC own", and + * lets user-defined columns and tables be unambiguously distinguished from AutoCDC-managed ones. + */ +private[pipelines] object AutoCdcReservedNames { + + /** Common reserved-name prefix shared by AutoCDC internal columns and internal tables. */ + val prefix: String = "__spark_autocdc_" +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index b975e06807f57..c475377ba5060 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -129,13 +129,23 @@ private[pipelines] object CaseSensitivityLabels { } /** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ -sealed trait ScdType +sealed trait ScdType { + /** + * Short, stable label for this SCD type. Persisted as table property on AutoCDC flow auxiliary + * tables. + */ + def label: String +} object ScdType { /** Representation for the standard SCD1 strategy. */ - case object Type1 extends ScdType + case object Type1 extends ScdType { + override val label: String = "SCD1" + } /** Representation for the standard SCD2 strategy. */ - case object Type2 extends ScdType + case object Type2 extends ScdType { + override val label: String = "SCD2" + } } /** diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index aaea3b63e9ef3..0035f442fb00a 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -367,19 +367,29 @@ case class Scd1BatchProcessor( val incomingWinsDelete = microbatchDeleteVersionField.isNotNull && microbatchDeleteVersionField > destinationUpsertVersionField - // When the incoming upsert wins against an existing record, the entire row (all columns) - // will be overwritten, including the CDC metadata column. We only exclude keys because - // most merge implementations require that join columns are not being mutated, even if - // the mutation is a no-op. val resolver = microbatchDf.sparkSession.sessionState.conf.resolver val keyNames = changeArgs.keys.map(_.name) + + def constructTargetColumnAssignmentsFromMicrobatch(columnName: String): (String, Column) = { + // Map a column in the target table to its direct equivalent in the microbatch. Note that + // because of target-table schema evolution during SDP dataset materialization, the + // microbatch's columns are always a subset of (or equal to) the target's columns. + val quotedCol = QuotingUtils.quoteIdentifier(columnName) + s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol") + } + + // Most merge implementations require that join columns are not mutated, even when the + // mutation would be a no-op. The remaining microbatch columns (including the CDC metadata + // column) are overwritten outright when the incoming upsert wins. val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] = microbatchDf.columns .filterNot(c => keyNames.exists(resolver(_, c))) - .map { c => - val quotedCol = QuotingUtils.quoteIdentifier(c) - s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol") - } + .map(constructTargetColumnAssignmentsFromMicrobatch) + .toMap + + val columnsToInsertOnNewKey: Map[String, Column] = + microbatchDf.columns + .map(constructTargetColumnAssignmentsFromMicrobatch) .toMap microbatchDf @@ -391,7 +401,12 @@ case class Scd1BatchProcessor( // New key: only insert upserts; deletes for absent keys are no-ops for the target table // merge, and instead would have been inserted as tombstones into the auxiliary table. .whenNotMatched(microbatchDeleteVersionField.isNull) - .insertAll() + // When inserting a brand new row for a new key, construct column mappings from microbatch. + // The microbatch's columns may be a strict subset of the target's columns -- e.g. the user + // narrowed `column_list` between runs, or the source DF dropped a column. The target's + // columns can never be a strict subset of the microbatch's, however, because SDP's schema + // evolution always unions old and new schemas onto the target. + .insert(columnsToInsertOnNewKey) .merge() } @@ -417,17 +432,15 @@ case class Scd1BatchProcessor( object Scd1BatchProcessor { /** - * Reserved column-name prefix for internal SDP AutoCDC processing. Source change-data-feed - * dataframes must not contain any columns starting with this prefix; the invariant is + * Internal columns inserted by AutoCDC reconciliation. Source change-data-feed dataframes must + * not contain any columns starting with [[AutoCdcReservedNames.prefix]]; the invariant is * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction. */ - private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_" - - private[autocdc] val winningRowColName: String = s"${reservedColumnNamePrefix}winning_row" - private[pipelines] val cdcMetadataColName: String = s"${reservedColumnNamePrefix}metadata" + private[autocdc] val winningRowColName: String = s"${AutoCdcReservedNames.prefix}winning_row" + private[pipelines] val cdcMetadataColName: String = s"${AutoCdcReservedNames.prefix}metadata" - private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence" - private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence" + private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence" + private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence" /** Project the delete sequence out of the CDC metadata column. */ private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column = diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index 4affbe4637dba..456edca8d1e22 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -303,6 +303,20 @@ object DatasetManager extends Logging { context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}") } + if (isFullRefresh) { + // On full refresh, drop the AutoCDC auxiliary state associated with this table (if any) so + // that stale delete-tracking data and table properties are not carried forward into the new + // table generation. We unconditionally issue the DROP for every fully-refreshed target. + + // Intentionally DROP and not TRUNCATE: the auxiliary table is an internal state store + // that is not part of the dataflow graph, so it does not participate in regular schema + // evolution like user tables do. On a full refresh we want a clean recreation against + // the new target schema rather than carrying forward the previous generation's layout. + + val auxiliaryTableId = AutoCdcAuxiliaryTable.identifier(table.identifier) + context.spark.sql(s"DROP TABLE IF EXISTS ${auxiliaryTableId.quotedString}") + } + // Alter the table if we need to existingTableOpt.foreach { existingTable => val existingSchema = v2ColumnsToStructType(existingTable.columns()) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index 04ef8d3186c5d..9f357ef026b0f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.pipelines.AnalysisWarning import org.apache.spark.sql.pipelines.autocdc.{ + AutoCdcReservedNames, CaseSensitivityLabels, ChangeArgs, ColumnSelection, @@ -271,7 +272,7 @@ class AutoCdcMergeFlow( } /** The DataType of the sequencing expression, derived once from the source change feed. */ - private val sequencingType: DataType = + private[graph] val sequencingType: DataType = df.select(changeArgs.sequencing).schema.head.dataType /** @@ -335,7 +336,7 @@ class AutoCdcMergeFlow( */ private def requireReservedPrefixAbsentInSourceColumns(): Unit = { val resolver = spark.sessionState.conf.resolver - val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix + val reservedPrefix = AutoCdcReservedNames.prefix def nameContainsReservedPrefix(name: String): Boolean = { name.length >= reservedPrefix.length && resolver( diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala index 13a5621947d57..ea151830f5441 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala @@ -23,12 +23,24 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal +import org.apache.spark.SparkException import org.apache.spark.internal.{Logging, LogKeys} +import org.apache.spark.sql.{AnalysisException, Dataset, Row} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.classic.SparkSession +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRowLevelOperations, TableCatalog} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.pipelines.autocdc.{ + AutoCdcReservedNames, + ChangeArgs, + Scd1BatchProcessor, + Scd1ForeachBatchHandler +} import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers import org.apache.spark.sql.pipelines.util.SparkSessionUtils import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.ThreadUtils /** @@ -301,3 +313,199 @@ class SinkWrite( .start() } } + +object AutoCdcAuxiliaryTable { + /** + * Helper for deriving the auxiliary AutoCDC catalog table identifier from a target table. If a + * table exists with a name matching the name derived here, it is assumed to be an AutoCDC + * auxiliary table that should be managed by the pipeline. + */ + def identifier(destination: TableIdentifier): TableIdentifier = TableIdentifier( + table = s"${AutoCdcReservedNames.prefix}aux_state_${destination.table}", + database = destination.database, + catalog = destination.catalog + ) + + /** + * Reserved table property key set on the auxiliary table to record which SCD strategy it + * serves. + */ + val scdTypePropertyKey: String = s"${PipelinesTableProperties.pipelinesPrefix}autocdc.scd_type" +} + +/** + * Base trait for AutoCDC merge-based write flows. + */ +trait AutoCdcMergeWriteBase { + /** The spark session the AutoCDC flow is going to be planned in. */ + protected def spark: SparkSession + + /** The destination (target) table entity the AutoCDC flow will be writing to. */ + protected def destination: Table + + /** The AutoCDC flow's [[ChangeArgs]] (keys, sequencing, columnSelection, ...). */ + protected def changeArgs: ChangeArgs + + /** Full schema of the auxiliary table for this SCD type. */ + protected def auxiliaryTableSchema: StructType + + /** + * Idempotently create the auxiliary table for [[destination]] if it does not already exist + * and return its [[TableIdentifier]]. + * + * Note that this is `CREATE TABLE IF NOT EXISTS`: when the aux table already exists, its + * schema is left untouched and `auxiliaryTableSchema` is ignored. For SCD1, they keys must be + * invariant across executions and the CDC metadata will always be present, so this is correct. + */ + protected def createAuxiliaryTableIfNotExists(spark: SparkSession): TableIdentifier = { + val auxIdent = AutoCdcAuxiliaryTable.identifier(destination.identifier) + // The auxiliary table inherits the target's format so MERGE semantics line up. When the + // target's format is unspecified (None), omit the USING clause and fall back to the + // session's default source provider. + val usingClause = destination.format.map(fmt => s"USING $fmt").getOrElse("") + val tblPropertiesClause = + s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.scdTypePropertyKey}' = " + + s"'${changeArgs.storedAsScdType.label}')" + spark.sql( + s"""CREATE TABLE IF NOT EXISTS + |${auxIdent.quotedString} + |(${auxiliaryTableSchema.toDDL}) $usingClause $tblPropertiesClause""".stripMargin + ) + auxIdent + } + + /** + * Validate that the target table's underlying connector implements + * [[SupportsRowLevelOperations]], which is the V2 connector contract for MERGE/UPDATE/DELETE + * with rewrite - all operations that the AutoCDC transformation executes. + */ + protected def requireDestinationSupportsRowLevelOps(): Unit = { + val (catalog, v2Identifier) = resolveTableCatalog(spark, destination.identifier) + val destinationTable = catalog.loadTable(v2Identifier) + + if (!destinationTable.isInstanceOf[SupportsRowLevelOperations]) { + throw new AnalysisException( + errorClass = "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE", + messageParameters = Map( + "tableName" -> destination.identifier.quotedString, + "format" -> destination.format.orElse( + Option( + destinationTable.properties.get(TableCatalog.PROP_PROVIDER) + ) + ) + .getOrElse("") + ) + ) + } + } + + private def resolveTableCatalog( + spark: SparkSession, + ident: TableIdentifier): (TableCatalog, Identifier) = { + val catalogManager = spark.sessionState.catalogManager + val catalogPlugin = ident.catalog + .map(catalogManager.catalog) + .getOrElse(catalogManager.currentCatalog) + val catalog = catalogPlugin match { + case t: TableCatalog => t + case _ => throw QueryCompilationErrors.missingCatalogTablesAbilityError(catalogPlugin) + } + val namespace = ident.database.getOrElse( + throw SparkException.internalError( + s"Cannot resolve table identifier ${ident.quotedString}: namespace is unspecified." + ) + ) + (catalog, Identifier.of(Array(namespace), ident.table)) + } +} + +/** + * A [[StreamingFlowExecution]] that applies a CDC event stream to a target [[Table]] via + * SCD Type 1 MERGE semantics. + */ +class Scd1MergeStreamingWrite( + val identifier: TableIdentifier, + val flow: AutoCdcMergeFlow, + val graph: DataflowGraph, + val updateContext: PipelineUpdateContext, + val checkpointPath: String, + val trigger: Trigger, + val destination: Table, + val sqlConf: Map[String, String] +) extends StreamingFlowExecution with AutoCdcMergeWriteBase { + + requireDestinationSupportsRowLevelOps() + + override def getOrigin: QueryOrigin = flow.origin + + override protected def changeArgs: ChangeArgs = flow.changeArgs + + override def startStream(): StreamingQuery = { + val sourceChangeDataFeed = graph.reanalyzeFlow(flow).df + + // The auxiliary table is created here (at flow execution) rather than during flow resolution + // or dataset materialization for two reasons: + // 1. It is an internal state store: we deliberately keep it out of the graph registration + // context's table set so that it is invisible to other flows and the [[DatasetManager]] + // will never materialize it. + // 2. Its format must match the target table's, which only exists after the target is + // materialized. Flow resolution must also stay side-effect free (e.g. for dry runs). + val auxiliaryTableIdentifier = createAuxiliaryTableIfNotExists(spark = updateContext.spark) + + val foreachBatchHandler = Scd1ForeachBatchHandler( + batchProcessor = Scd1BatchProcessor( + changeArgs = flow.changeArgs, + resolvedSequencingType = flow.sequencingType + ), + auxiliaryTableIdentifier = auxiliaryTableIdentifier, + targetTableIdentifier = destination.identifier + ) + + sourceChangeDataFeed.writeStream + .queryName(displayName) + .option("checkpointLocation", checkpointPath) + .trigger(trigger) + .foreachBatch((batch: Dataset[Row], batchId: Long) => { + foreachBatchHandler.execute(batch, batchId) + }) + .start() + } + + override protected lazy val auxiliaryTableSchema: StructType = + // SCD1's auxiliary table is just keys + the CDC metadata struct; no user data columns. Keys + // come first, in `changeArgs.keys` declaration order, to anchor the per-key sequence + // watermark used to gate out-of-order events. + StructType(autoCdcKeyFields :+ cdcMetadataField) + + /** + * AutoCDC key columns resolved out of the flow's augmented schema, in + * `changeArgs.keys` declaration order. Keys are guaranteed to be present in the schema + * because [[AutoCdcMergeFlow.schema]] validates that. + */ + private lazy val autoCdcKeyFields: Seq[StructField] = { + val resolver = updateContext.spark.sessionState.conf.resolver + val targetTableSchema = flow.schema + flow.changeArgs.keys.map { key => + targetTableSchema.fields + .find(field => resolver(field.name, key.name)) + .getOrElse( + throw SparkException.internalError( + s"Key column '${key.name}' was not found in the AutoCDC flow's selected schema." + ) + ) + } + } + + /** CDC metadata field resolved out of the flow's augmented schema. */ + private lazy val cdcMetadataField: StructField = { + val resolver = updateContext.spark.sessionState.conf.resolver + flow.schema.fields + .find(field => resolver(field.name, Scd1BatchProcessor.cdcMetadataColName)) + .getOrElse( + throw SparkException.internalError( + s"CDC metadata column '${Scd1BatchProcessor.cdcMetadataColName}' was not found in the " + + s"AutoCDC flow's target table schema." + ) + ) + } +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala index 29e2da4a5e13f..8251780524a2d 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.pipelines.graph +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.pipelines.autocdc.ScdType import org.apache.spark.sql.streaming.Trigger /** @@ -73,10 +75,30 @@ class FlowPlanner( trigger = triggerFor(sf), checkpointPath = flowMetadata.latestCheckpointLocation ) - case _ => - throw new UnsupportedOperationException( - s"Unsupported destination type: ${output.getClass.getSimpleName} for " + - s"streaming flow ${sf.identifier} (${flow.destinationIdentifier})" + case _ => unsupportedDestinationType(sf, output) + } + case acmf: AutoCdcMergeFlow => + acmf.changeArgs.storedAsScdType match { + case ScdType.Type1 => + val flowMetadata = FlowSystemMetadata(updateContext, acmf, graph) + output match { + case o: Table => + new Scd1MergeStreamingWrite( + identifier = acmf.identifier, + flow = acmf, + graph = graph, + updateContext = updateContext, + checkpointPath = flowMetadata.latestCheckpointLocation, + trigger = triggerFor(acmf), + destination = o, + sqlConf = acmf.sqlConf + ) + case _ => unsupportedDestinationType(acmf, output) + } + case ScdType.Type2 => + throw new AnalysisException( + errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED", + messageParameters = Map.empty ) } case _ => @@ -85,4 +107,11 @@ class FlowPlanner( ) } } + + private def unsupportedDestinationType(flow: ResolvedFlow, output: Output): Nothing = { + throw new UnsupportedOperationException( + s"Unsupported destination type: ${output.getClass.getSimpleName} for " + + s"flow ${flow.identifier} writing to ${flow.destinationIdentifier}" + ) + } } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala index 8d365906559bc..65eafd6c7dcc2 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala @@ -409,7 +409,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { "Contract: a source df column with the reserved AutoCDC prefix is rejected at flow " + "construction" ) { - val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo" + val conflictingName = s"${AutoCdcReservedNames.prefix}foo" val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType) checkError( @@ -422,7 +422,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, "columnName" -> conflictingName, "schemaName" -> "changeDataFeed", - "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix ) ) } @@ -435,7 +435,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { // from any ChangeArgs path still fails at construction with a different error. The // reservation is on the name itself, not on its presence in the source feed. val cleanSourceDf = threeColumnSourceDf() - val reservedName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo" + val reservedName = s"${AutoCdcReservedNames.prefix}foo" val keysEx = intercept[AnalysisException] { newAutoCdcMergeFlow( @@ -487,7 +487,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, "columnName" -> Scd1BatchProcessor.cdcMetadataColName, "schemaName" -> "changeDataFeed", - "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix ) ) } @@ -497,7 +497,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { ) { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { val conflictingName = - s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT) + s"${AutoCdcReservedNames.prefix}foo".toUpperCase(Locale.ROOT) val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType) checkError( @@ -510,7 +510,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, "columnName" -> conflictingName, "schemaName" -> "changeDataFeed", - "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix ) ) } @@ -524,7 +524,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { // `spark.sql.caseSensitive`, consistent with the schema-augmentation logic in this class. withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { val nonConflictingName = - s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT) + s"${AutoCdcReservedNames.prefix}foo".toUpperCase(Locale.ROOT) val sourceDf = sourceDfWithExtraColumns(nonConflictingName -> StringType) // No exception expected: construction succeeds. diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala new file mode 100644 index 0000000000000..5e2286a4fd56d --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import org.scalatest.{BeforeAndAfterEach, Suite} + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.pipelines.autocdc.{ + ChangeArgs, + ColumnSelection, + Scd1BatchProcessor, + ScdType, + UnqualifiedColumnName +} +import org.apache.spark.sql.pipelines.common.RunState +import org.apache.spark.sql.pipelines.logging.RunProgress +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Shared helpers for AutoCDC end-to-end graph-execution test suites. + */ +trait AutoCdcGraphExecutionTestMixin extends BeforeAndAfterEach { + self: Suite with ExecutionTest with SharedSparkSession => + + /** v2 catalog name registered for AutoCDC E2E tests. Tests qualify tables as `cat.ns1.t`. */ + protected val catalog: String = "cat" + + /** Namespace under [[catalog]] used by AutoCDC E2E tests. */ + protected val namespace: String = "ns1" + + override protected def beforeEach(): Unit = { + super.beforeEach() + spark.conf.set( + s"spark.sql.catalog.$catalog", + classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName + ) + // Disable per-flow retries so failure-path tests (e.g. INCOMPATIBLE_DATA) surface the + // AnalysisException after the first attempt instead of going through the default 2 retries, + // which would otherwise emit duplicate FAILED events and inflate test runtime without + // changing the asserted outcome. + spark.conf.set(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key, "0") + spark.sql(s"CREATE NAMESPACE IF NOT EXISTS $catalog.$namespace") + } + + override protected def afterEach(): Unit = { + SharedTablesInMemoryRowLevelOperationTableCatalog.reset() + spark.sessionState.catalogManager.reset() + spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$catalog") + spark.sessionState.conf.unsetConf(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key) + super.afterEach() + } + + /** + * Run a pipeline to completion. If any flow emitted a [[RunProgress]] event with state + * [[RunState.FAILED]], collect every error from the event buffer and throw a single + * exception listing them, so that test failures surface meaningful stack traces instead of + * generic "test exited normally but flow failed" errors. + */ + protected def runPipeline(ctx: TestGraphRegistrationContext): Unit = { + val updateCtx = TestPipelineUpdateContext(spark, ctx.toDataflowGraph, storageRoot) + updateCtx.pipelineExecution.runPipeline() + updateCtx.pipelineExecution.awaitCompletion() + + if (updateCtx.eventBuffer.getEvents.exists(_.details == RunProgress(RunState.FAILED))) { + val errors = updateCtx.eventBuffer.getEvents.flatMap(_.error) + val ex = new RuntimeException( + s"Pipeline run failed with ${errors.size} error(s):\n" + + errors.map { e => + val stackSnippet = e.getStackTrace + .map(f => s" at $f") + .mkString("\n") + s" ${e.getClass.getSimpleName}: ${e.getMessage}\n$stackSnippet" + }.mkString("\n") + ) + errors.foreach(ex.addSuppressed) + throw ex + } + } + + /** + * Walk every [[Throwable]] reachable from `failure` via [[Throwable#getSuppressed]] and + * [[Throwable#getCause]], searching for the first [[SparkThrowable]] whose + * [[SparkThrowable#getCondition]] equals `condition`, then run [[checkError]] against that + * exception with all of its other arguments propagated through. + */ + protected def checkErrorInPipelineFailure( + failure: Throwable, + condition: String, + sqlState: Option[String] = None, + parameters: Map[String, String] = Map.empty, + matchPVals: Boolean = false, + queryContext: Array[ExpectedContext] = Array.empty): Unit = { + + def causeChain(t: Throwable): Iterator[Throwable] = + Iterator.iterate[Throwable](t)(_.getCause).takeWhile(_ != null) + + def reachable: Iterator[Throwable] = + (Iterator(failure) ++ failure.getSuppressed.iterator).flatMap(causeChain) + + val matched = reachable.collectFirst { + case t: SparkThrowable if t.getCondition == condition => t + } + assert( + matched.isDefined, + s"Expected a SparkThrowable with condition '$condition' reachable from the runPipeline " + + s"failure chain, got top-level: ${failure.getMessage}; chain:\n" + + reachable + .map(t => s" ${t.getClass.getSimpleName}: ${t.getMessage}") + .mkString("\n") + ) + checkError( + exception = matched.get, + condition = condition, + sqlState = sqlState, + parameters = parameters, + matchPVals = matchPVals, + queryContext = queryContext + ) + } + + /** + * DDL fragment for the AutoCDC metadata column appended to every SCD1 target table. Use + * inside a `CREATE TABLE` statement, for example: + * `CREATE TABLE t (id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)` + * + * Assumes sequence type is BIGINT (Long). + */ + protected val cdcMetadataDdl: String = { + val col = Scd1BatchProcessor.cdcMetadataColName + val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName + val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName + s"$col STRUCT<$del:BIGINT,$ups:BIGINT> NOT NULL" + } + + /** + * Insert a pre-existing row into a target table, populating the CDC metadata struct so the + * row looks as if a previous AutoCDC run upserted it at sequencing version [[sequence]]. + * + * @param table Fully-qualified table name (catalog.schema.table). + * @param colValues Comma-separated SQL literals for the user-defined columns, in declared + * order, excluding the trailing CDC metadata column. + * @param sequence Value to seed `_cdc_metadata.upsertSequence` with. The + * `deleteSequence` field is left NULL. + */ + protected def insertPreloadedRow(table: String, colValues: String, sequence: Long): Unit = { + val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName + val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName + spark.sql( + s"INSERT INTO $table SELECT $colValues, " + + s"named_struct('$del', CAST(NULL AS BIGINT), '$ups', CAST($sequence AS BIGINT))" + ) + } + + /** Catalog identifier of the AutoCDC auxiliary table for [[targetTableName]]. */ + protected def auxTableNameFor(targetTableName: String): String = { + val targetIdent = fullyQualifiedIdentifier(targetTableName, Some(catalog), Some(namespace)) + AutoCdcAuxiliaryTable.identifier(targetIdent).unquotedString + } + + /** + * Construct an [[AutoCdcFlow]] targeting `catalog.namespace.${target}` from the given + * query and CDC knobs. + */ + protected def autoCdcFlow( + name: String, + target: String, + query: FlowFunction, + keys: Seq[String], + sequencing: Column, + columnSelection: Option[ColumnSelection] = None, + deleteCondition: Option[Column] = None, + scdType: ScdType = ScdType.Type1 + ): AutoCdcFlow = AutoCdcFlow( + identifier = fullyQualifiedIdentifier(name, Some(catalog), Some(namespace)), + destinationIdentifier = fullyQualifiedIdentifier(target, Some(catalog), Some(namespace)), + func = query, + queryContext = QueryContext( + currentCatalog = Some(catalog), + currentDatabase = Some(namespace) + ), + origin = QueryOrigin.empty, + changeArgs = ChangeArgs( + keys = keys.map(UnqualifiedColumnName(_)), + sequencing = sequencing, + columnSelection = columnSelection, + deleteCondition = deleteCondition, + storedAsScdType = scdType + ) + ) + + /** Build a target row's `_cdc_metadata` struct value. */ + protected def cdcMeta(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row = + Row(deleteSeq.orNull, upsertSeq.orNull) +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala new file mode 100644 index 0000000000000..50ff60556a73c --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions +import org.apache.spark.sql.pipelines.autocdc.{ + ColumnSelection, + Scd1BatchProcessor, + UnqualifiedColumnName +} +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests covering the durability of AutoCDC's auxiliary table across pipeline runs: + * the per-key sequence watermarks recorded in the auxiliary table must persist between + * incremental runs, and the auxiliary table must be transparently recreated if it is + * deleted out-of-band. + */ +class AutoCdcScd1AuxiliaryTableDurabilitySuite + extends ExecutionTest + with SharedSparkSession + with AutoCdcGraphExecutionTestMixin { + + test("a higher-sequence event in a later pipeline run correctly upserts the row") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Single MemoryStream reused across both pipeline runs so the streaming checkpoint can + // resume cleanly. + val changeDataFeedStream = MemoryStream[(Int, String, Long)] + def buildGraphRegistrationContext(): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc( + changeDataFeedStream.toDF().toDF("id", "name", "version") + ), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + + // Run #1: insert id=1 at seq=1. + changeDataFeedStream.addData((1, "alice", 1L)) + runPipeline(buildGraphRegistrationContext()) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)))) + ) + + // Run #2: upsert id=1 at seq=2 (must replace) and insert id=2 at seq=1 (new key). + // The auxiliary table from run #1 persists and continues to gate seq comparisons. + changeDataFeedStream.addData((1, "alice2", 2L), (2, "bob", 1L)) + runPipeline(buildGraphRegistrationContext()) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq( + Row(1, "alice2", 2L, cdcMeta(None, Some(2L))), + Row(2, "bob", 1L, cdcMeta(None, Some(1L))) + ) + ) + } + + test("an event with a sequence lower than what was applied in a prior pipeline run " + + "is suppressed") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Single MemoryStream reused across both runs so the streaming checkpoint can resume. + val stream = MemoryStream[(Int, String, Long, Boolean)] + def buildCtx(): TestGraphRegistrationContext = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version", "is_delete")), + keys = Seq("id"), + sequencing = functions.col("version"), + deleteCondition = Some(functions.col("is_delete") === true), + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("is_delete")) + )) + )) + } + + // Run #1: delete id=1 at seq=10. Auxiliary table records seq=10 as the watermark. + stream.addData((1, "alice", 10L, true)) + runPipeline(buildCtx()) + checkAnswer(spark.table(s"$catalog.$namespace.target"), Seq.empty) + + // Run #2: late upsert at seq=5 (< the persisted seq=10 watermark). Must be rejected. + stream.addData((1, "stale", 5L, false)) + runPipeline(buildCtx()) + + // Auxiliary table watermark from run #1 (seq=10) should keep rejecting the seq=5 event. + checkAnswer(spark.table(s"$catalog.$namespace.target"), Seq.empty) + } + + test("the auxiliary table places the AutoCDC key column first, ahead of any non-key " + + "source columns") { + val session = spark + import session.implicits._ + + // Source DF column order is (name, id, version): the AutoCDC key column `id` does NOT + // appear first in the source DF. The auxiliary table must still write `id` as its + // leading column. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(name STRING, id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(String, Int, Long)] + stream.addData(("alice", 1, 1L)) + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("name", "id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx) + + val auxSchema = spark.table(auxTableNameFor("target")).schema + + // The auxiliary table only contains keys and the metadata column, hence "name" should not be + // included. + assert(auxSchema.fieldNames.toSeq == Seq("id", Scd1BatchProcessor.cdcMetadataColName)) + } + + test("the auxiliary table preserves the user's declared key order, independent of the " + + "source DataFrame and target table column orders") { + val session = spark + import session.implicits._ + + // Source DF: (value, id, region, version). Target table: (value, id, region, version, + // _cdc_metadata) -- same ordering as the source. The user, however, declares + // `keys = Seq("region", "id")` -- the OPPOSITE order from how those columns appear in + // both the source DF and the target. The auxiliary table should honor the user's + // declared key order, not either physical column ordering, so subsequent runs + // positionally compare keys against the same recorded layout. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(value STRING, id INT NOT NULL, region STRING NOT NULL, " + + s"version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(String, Int, String, Long)] + stream.addData(("v", 1, "us", 1L)) + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("value", "id", "region", "version")), + keys = Seq("region", "id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx) + + val auxSchema = spark.table(auxTableNameFor("target")).schema + assert(auxSchema.fieldNames.toSeq == + Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName)) + } + + test("if the AutoCDC auxiliary table is dropped between runs, it is transparently " + + "recreated") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Single MemoryStream reused across both runs so the streaming checkpoint can resume. + val stream = MemoryStream[(Int, Long)] + def buildCtx(): TestGraphRegistrationContext = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + + stream.addData((1, 1L)) + runPipeline(buildCtx()) + assert(spark.catalog.tableExists(auxTableNameFor("target"))) + + // Manually drop the auxiliary table. + spark.sql(s"DROP TABLE ${auxTableNameFor("target")}") + assert(!spark.catalog.tableExists(auxTableNameFor("target"))) + + stream.addData((1, 2L)) + runPipeline(buildCtx()) + + // The dropped auxiliary table must be transparently recreated. + assert(spark.catalog.tableExists(auxTableNameFor("target"))) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, 2L, cdcMeta(None, Some(2L)))) + ) + } + +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala new file mode 100644 index 0000000000000..94ba7e20aed1f --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions +import org.apache.spark.sql.pipelines.autocdc.{ + ColumnSelection, + UnqualifiedColumnName +} +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests covering AutoCDC's full-refresh semantics: full refresh must wipe both the + * target rows and the AutoCDC auxiliary table for the refreshed targets, and must leave + * non-refreshed targets untouched in selective-refresh mode. + */ +class AutoCdcScd1FullRefreshSuite + extends ExecutionTest + with SharedSparkSession + with AutoCdcGraphExecutionTestMixin { + + test("full refresh wipes target rows and the auxiliary table for the refreshed flow") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Run #1: populate target + auxiliary table. + val stream1 = MemoryStream[(Int, String, Long)] + stream1.addData((1, "alice", 5L)) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx1) + assert( + spark.catalog.tableExists(auxTableNameFor("target")), + "Auxiliary table should exist after first run" + ) + + // Run #2 (full refresh): auxiliary table should be dropped by DatasetManager, target + // truncated. The new run brings only id=2 at seq=1. + val stream2 = MemoryStream[(Int, String, Long)] + stream2.addData((2, "bob", 1L)) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + val updateCtx = TestPipelineUpdateContext( + spark, + ctx2.toDataflowGraph, + storageRoot, + fullRefreshTables = AllTables + ) + updateCtx.pipelineExecution.runPipeline() + updateCtx.pipelineExecution.awaitCompletion() + + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(2, "bob", 1L, cdcMeta(None, Some(1L)))) + ) + } + + test("after a full refresh, an event with a sequence below the previous run's " + + "watermark now lands") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Run #1: delete at seq=10 sets a high watermark in the auxiliary table. + val stream1 = MemoryStream[(Int, String, Long, Boolean)] + stream1.addData((1, "alice", 10L, true)) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version", "is_delete")), + keys = Seq("id"), + sequencing = functions.col("version"), + deleteCondition = Some(functions.col("is_delete") === true), + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("is_delete")) + )) + )) + } + runPipeline(ctx1) + + // Run #2 (full refresh): auxiliary table is dropped, watermark reset. seq=5 should + // now land. + val stream2 = MemoryStream[(Int, String, Long, Boolean)] + stream2.addData((1, "fresh", 5L, false)) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version", "is_delete")), + keys = Seq("id"), + sequencing = functions.col("version"), + deleteCondition = Some(functions.col("is_delete") === true), + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("is_delete")) + )) + )) + } + val updateCtx = TestPipelineUpdateContext( + spark, + ctx2.toDataflowGraph, + storageRoot, + fullRefreshTables = AllTables + ) + updateCtx.pipelineExecution.runPipeline() + updateCtx.pipelineExecution.awaitCompletion() + + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "fresh", 5L, cdcMeta(None, Some(5L)))) + ) + } + + test("selective full refresh wipes only the requested target's auxiliary state") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.t_a " + + s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + spark.sql( + s"CREATE TABLE $catalog.$namespace.t_b " + + s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // streamA is replaced across runs because t_a is full-refreshed in run #2 (its streaming + // checkpoint is reset by full-refresh, so a fresh source is fine and matches the user-visible + // semantics). streamB is reused across runs because t_b is NOT full-refreshed -- its + // streaming checkpoint must resume against the same MemoryStream instance, otherwise the + // seq=5 assertion below could pass for the wrong reason (the source never produced seq=5 + // in run #2 instead of the aux watermark suppressing it). + val streamA1 = MemoryStream[(Int, Long)] + val streamB = MemoryStream[(Int, Long)] + streamA1.addData((1, 10L)) + streamB.addData((1, 10L)) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("t_a", catalog = Some(catalog), database = Some(namespace)) + registerTable("t_b", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_a", + target = "t_a", + query = dfFlowFunc(streamA1.toDF().toDF("id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + registerFlow(autoCdcFlow( + name = "flow_b", + target = "t_b", + query = dfFlowFunc(streamB.toDF().toDF("id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx1) + + // Run #2: full refresh ONLY on t_a; t_b's auxiliary state must persist. + val streamA2 = MemoryStream[(Int, Long)] + streamA2.addData((1, 5L)) // would have been suppressed pre-refresh; now wins + streamB.addData((1, 5L)) // must be suppressed (auxiliary table retains seq=10) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("t_a", catalog = Some(catalog), database = Some(namespace)) + registerTable("t_b", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_a", + target = "t_a", + query = dfFlowFunc(streamA2.toDF().toDF("id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + registerFlow(autoCdcFlow( + name = "flow_b", + target = "t_b", + query = dfFlowFunc(streamB.toDF().toDF("id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + val updateCtx = TestPipelineUpdateContext( + spark, + ctx2.toDataflowGraph, + storageRoot, + fullRefreshTables = SomeTables(Set( + fullyQualifiedIdentifier("t_a", Some(catalog), Some(namespace)) + )) + ) + updateCtx.pipelineExecution.runPipeline() + updateCtx.pipelineExecution.awaitCompletion() + + checkAnswer( + spark.table(s"$catalog.$namespace.t_a"), + Seq(Row(1, 5L, cdcMeta(None, Some(5L)))) + ) + // t_b: pre-existing seq=10 row still wins; the seq=5 event is dropped. + checkAnswer( + spark.table(s"$catalog.$namespace.t_b"), + Seq(Row(1, 10L, cdcMeta(None, Some(10L)))) + ) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala new file mode 100644 index 0000000000000..32f34923c480e --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * End-to-end tests that exercise interactions between separate AutoCDC pipelines (i.e. + * distinct [[DataflowGraph]] / [[TestPipelineUpdateContext]] invocations) sharing the same + * v2 catalog. These complement the single-pipeline AutoCDC suites by validating the + * boundary semantics between independently-deployed pipelines. + * + * Each test constructs two graphs and runs them sequentially. In real deployments these + * could be two different pipeline definitions writing into the same metastore; the tests + * here verify that AutoCDC's per-target catalog state (target table, auxiliary table, + * schema invariants) behaves correctly across these pipeline boundaries. + */ +class AutoCdcScd1MultiPipelineSuite + extends ExecutionTest + with SharedSparkSession + with AutoCdcGraphExecutionTestMixin { + + test("two AutoCDC pipelines targeting separate tables maintain independent target and " + + "auxiliary tables") { + val session = spark + import session.implicits._ + + // Two distinct target tables created up-front. + spark.sql( + s"CREATE TABLE $catalog.$namespace.t_a " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + spark.sql( + s"CREATE TABLE $catalog.$namespace.t_b " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Pipeline #1 only knows about `t_a`. Its auxiliary table + // cat.ns1.__spark_autocdc_aux_state_t_a must not affect pipeline #2's `t_b`. + val streamA = MemoryStream[(Int, String, Long)] + streamA.addData((1, "alice", 100L)) + val ctxA = new TestGraphRegistrationContext(spark) { + registerTable("t_a", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_a", + target = "t_a", + query = dfFlowFunc(streamA.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctxA) + + // Pipeline #2 only knows about `t_b`. Uses a deliberately *lower* sequence to verify + // the watermark from pipeline #1's auxiliary table (seq=100) does not leak into + // pipeline #2. + val streamB = MemoryStream[(Int, String, Long)] + streamB.addData((9, "bob", 1L)) + val ctxB = new TestGraphRegistrationContext(spark) { + registerTable("t_b", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_b", + target = "t_b", + query = dfFlowFunc(streamB.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctxB) + + checkAnswer( + spark.table(s"$catalog.$namespace.t_a"), + Seq(Row(1, "alice", 100L, cdcMeta(None, Some(100L)))) + ) + checkAnswer( + spark.table(s"$catalog.$namespace.t_b"), + Seq(Row(9, "bob", 1L, cdcMeta(None, Some(1L)))) + ) + + // Each target has its own auxiliary table; no cross-contamination. + assert(spark.catalog.tableExists(auxTableNameFor("t_a"))) + assert(spark.catalog.tableExists(auxTableNameFor("t_b"))) + } + + test("a downstream pipeline can read an AutoCDC target written by a different pipeline " + + "without observing the CDC metadata column") { + val session = spark + import session.implicits._ + + // Pipeline #1 writes into target `src` via AutoCDC. + spark.sql( + s"CREATE TABLE $catalog.$namespace.src " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + val stream = MemoryStream[(Int, String, Long)] + stream.addData((1, "alice", 1L), (2, "bob", 1L)) + val ctxWriter = new TestGraphRegistrationContext(spark) { + registerTable("src", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "writer", + target = "src", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctxWriter) + + // Pipeline #2 is a regular materialized view that selects the user-data columns from + // `src` (a different graph entirely). It must observe the merged AutoCDC rows and be + // able to ignore the metadata column without it polluting downstream consumers. + val ctxReader = new TestGraphRegistrationContext(spark) { + registerMaterializedView( + "downstream_mv", + query = dfFlowFunc( + spark.read.table(s"$catalog.$namespace.src").select("id", "name", "version") + ) + ) + } + runPipeline(ctxReader) + + checkAnswer( + spark.table(fullyQualifiedIdentifier("downstream_mv").toString), + Seq(Row(1, "alice", 1L), Row(2, "bob", 1L)) + ) + } + + test("two AutoCDC pipelines targeting the same table with identical key and data " + + "schemas merge into a shared target table") { + val session = spark + import session.implicits._ + + // Target table is created once up-front; both pipelines target it with the same + // AutoCDC `keys` and the same source-DF data schema. The two pipelines have distinct + // flow names ("flow_v1" / "flow_v2") so they own independent streaming checkpoints, + // but share the target table and its auxiliary table. + spark.sql( + s"CREATE TABLE $catalog.$namespace.shared_target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Pipeline #1: inserts rows with id=1 and id=2 at version=1. + val stream1 = MemoryStream[(Int, String, Long)] + stream1.addData((1, "alice", 1L), (2, "bob", 1L)) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_v1", + target = "shared_target", + query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx1) + + // Sanity-check pipeline #1's effect before pipeline #2 runs. + checkAnswer( + spark.table(s"$catalog.$namespace.shared_target"), + Seq( + Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))), + Row(2, "bob", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))) + ) + ) + + // Pipeline #2: updates id=2 (existing key) to a higher sequence and inserts id=3 + // (new key). id=1 is untouched and must survive into the final target unchanged. + val stream2 = MemoryStream[(Int, String, Long)] + stream2.addData((2, "bob-v2", 2L), (3, "carol", 1L)) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_v2", + target = "shared_target", + query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx2) + + // Final target: id=1 untouched (pipeline #1's state), id=2 updated by pipeline #2, + // id=3 freshly inserted by pipeline #2. + checkAnswer( + spark.table(s"$catalog.$namespace.shared_target"), + Seq( + Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))), + Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L))), + Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))) + ) + ) + + // The auxiliary table for the shared target is itself shared across both pipelines. + assert(spark.catalog.tableExists(auxTableNameFor("shared_target"))) + } + + test("two AutoCDC pipelines targeting the same table with the same key but different " + + "data columns evolve the shared target schema") { + val session = spark + import session.implicits._ + + // Target is created up-front with pipeline #1's schema only; pipeline #2 brings a new + // top-level nullable `age` column that the dataset materialization layer is expected + // to schema-merge into the target. + spark.sql( + s"CREATE TABLE $catalog.$namespace.shared_target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Pipeline #1: source DF schema is (id, name, version); inserts id=1 and id=2. + val stream1 = MemoryStream[(Int, String, Long)] + stream1.addData((1, "alice", 1L), (2, "bob", 1L)) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_v1", + target = "shared_target", + query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx1) + + // Sanity-check pipeline #1's state before schema evolution kicks in. + checkAnswer( + spark.table(s"$catalog.$namespace.shared_target"), + Seq( + Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))), + Row(2, "bob", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))) + ) + ) + + // Pipeline #2: source DF schema is (id, name, age, version). The new nullable `age` column + // should be added to the target by dataset materialization; pipeline #1's untouched id=1 row + // is backfilled to NULL. + val stream2 = MemoryStream[(Int, String, Option[Int], Long)] + stream2.addData((2, "bob-v2", Some(25), 2L), (3, "carol", Some(30), 1L)) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_v2", + target = "shared_target", + query = dfFlowFunc(stream2.toDF().toDF("id", "name", "age", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx2) + + checkAnswer( + spark.table(s"$catalog.$namespace.shared_target"), + Seq( + Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), null), + Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L)), 25), + Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), 30) + ) + ) + + // Pipeline #1 runs again with its original (id, name, version) schema. The evolved + // target schema with `age` must persist: id=1's update leaves age untouched, id=4 is + // inserted with age=NULL, and pipeline #2's id=2/id=3 rows are unchanged. + stream1.addData((1, "alice-v2", 2L), (4, "dave", 1L)) + runPipeline(ctx1) + + checkAnswer( + spark.table(s"$catalog.$namespace.shared_target"), + Seq( + Row(1, "alice-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L)), null), + Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L)), 25), + Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), 30), + Row(4, "dave", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), null) + ) + ) + } + +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala new file mode 100644 index 0000000000000..4c20b21ad57a5 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.sql.Timestamp + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.pipelines.autocdc.{ + ColumnSelection, + UnqualifiedColumnName +} +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests covering AutoCDC's interaction with schema evolution across pipeline runs. The + * suite documents the supported additive cases (new top-level columns, new nested fields + * in array-of-struct, broadening / narrowing column selection) and the cases that fail + * loudly today (subtractive nested evolution, type-incompatible changes, case-only + * renames). + * + * These behaviors are largely inherited from the lower layers (`SchemaMergingUtils` for + * schema merge, the v2 writer's column-resolution layer for nested-field handling) rather + * than implemented in AutoCDC itself; the tests here serve as the contract for AutoCDC's + * observable behavior on top of those layers. + */ +class AutoCdcScd1SchemaEvolutionSuite + extends ExecutionTest + with SharedSparkSession + with AutoCdcGraphExecutionTestMixin { + + test("a nullable non-key column merges correctly with mixed NULL and non-NULL values") { + val session = spark + import session.implicits._ + + // Single MemoryStream with `email` as nullable from the start. Run #1 emits a row with + // a NULL email; run #2 emits an upsert with a non-NULL email. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, email STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, String, Option[String], Long)] + def buildCtx(): TestGraphRegistrationContext = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + + // Run #1: insert with NULL email. + stream.addData((1, "alice", None, 1L)) + runPipeline(buildCtx()) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", null, 1L, cdcMeta(None, Some(1L)))) + ) + + // Run #2: upsert with non-NULL email at higher seq replaces the row. + stream.addData((1, "alice2", Some("a@x.com"), 2L)) + runPipeline(buildCtx()) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice2", "a@x.com", 2L, cdcMeta(None, Some(2L)))) + ) + } + + test("widening a non-key column's type between runs fails with " + + "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") { + val session = spark + import session.implicits._ + + // Changing a non-key column's type between pipeline runs is rejected by + // `SchemaMergingUtils` with CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE even when the new type + // is strictly wider. Users must full-refresh the target to change column types. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, age INT, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream1 = MemoryStream[(Int, Int, Long)] + stream1.addData((1, 30, 1L)) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream1.toDF().toDF("id", "age", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx1) + + // Run #2: widen `age` from Int to Long. + val stream2 = MemoryStream[(Int, Long, Long)] + stream2.addData((1, 31L, 2L)) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream2.toDF().toDF("id", "age", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + val ex = intercept[RuntimeException] { runPipeline(ctx2) } + checkErrorInPipelineFailure( + failure = ex, + condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE", + sqlState = Some("42825"), + // `left` is the persisted (run #1) INT type; `right` is run #2's widened BIGINT. + parameters = Map( + "left" -> "\"INT\"", + "right" -> "\"BIGINT\"" + ) + ) + } + + test("narrowing a non-key column's type between runs fails with " + + "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") { + val session = spark + import session.implicits._ + + // Mirror image of the widening test above: changing a non-key column's type between + // pipeline runs is rejected by SchemaMergingUtils with CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE + // even when the new type is strictly narrower. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, payload BIGINT, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream1 = MemoryStream[(Int, Long, Long)] + stream1.addData((1, 100L, 1L)) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream1.toDF().toDF("id", "payload", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx1) + + // Run #2: narrow `payload` from Long (BIGINT) to Int (INT). + val stream2 = MemoryStream[(Int, Int, Long)] + stream2.addData((1, 5, 2L)) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream2.toDF().toDF("id", "payload", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + + val ex = intercept[RuntimeException] { runPipeline(ctx2) } + checkErrorInPipelineFailure( + failure = ex, + condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE", + sqlState = Some("42825"), + // `left` is the persisted (run #1) BIGINT type; `right` is run #2's narrowed INT. + parameters = Map( + "left" -> "\"BIGINT\"", + "right" -> "\"INT\"" + ) + ) + } + + test("a new top-level nullable column appearing in the source DF between runs is " + + "added to the target") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Single MemoryStream of (id, name, email, version) shared across runs so the streaming + // checkpoint can resume cleanly. Run #1's flow drops `email` so the source's resolved DF + // schema is 3 columns; run #2 keeps all 4. The MemoryStream's underlying tuple schema is + // unchanged (only the downstream projection differs), so the source identity that the + // OffsetSeqLog records is stable across runs. + val stream = MemoryStream[(Int, String, Option[String], Long)] + def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + val sourceDf = stream.toDF().toDF("id", "name", "email", "version") + val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email") + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(projectedDf), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + + // Run #1: source projects (id, name, version). Target schema is unchanged. + stream.addData((1, "alice", None, 1L)) + runPipeline(buildCtx(includeEmail = false)) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)))) + ) + + // Run #2: source projects (id, name, email, version). mergeSchemas appends `email` to + // the target (StructType.merge keeps the left schema's order and appends right-only + // fields); existing rows get NULL for the new column. + stream.addData((2, "bob", Some("b@x.com"), 2L)) + runPipeline(buildCtx(includeEmail = true)) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq( + Row(1, "alice", 1L, cdcMeta(None, Some(1L)), null), + Row(2, "bob", 2L, cdcMeta(None, Some(2L)), "b@x.com") + ) + ) + } + + test("broadening the column selection between runs adds the newly-included column to " + + "the target") { + val session = spark + import session.implicits._ + + // Source DF schema is fixed at (id, name, email, version) across both runs. Only the + // `columnSelection` knob differs: run #1 includes (id, name, version); run #2 selects + // None (= all source columns). mergeSchemas adds `email` to the target via the same + // generic SDP path as the new-source-column case, but driven by the + // [[ColumnSelection]] knob rather than the source DF's own schema. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, String, String, Long)] + def buildCtx(selection: Option[ColumnSelection]): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", "version")), + keys = Seq("id"), + sequencing = functions.col("version"), + columnSelection = selection + )) + } + + // Run #1: only (id, name, version) selected; `email` is dropped before the MERGE. + stream.addData((1, "alice", "ignored", 1L)) + runPipeline(buildCtx(selection = Some(ColumnSelection.IncludeColumns( + Seq("id", "name", "version").map(UnqualifiedColumnName(_)) + )))) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)))) + ) + + // Run #2: broaden to no selection. mergeSchemas adds `email`; existing rows get NULL, + // new rows get the actual value. + stream.addData((2, "bob", "b@x.com", 2L)) + runPipeline(buildCtx(selection = None)) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq( + Row(1, "alice", 1L, cdcMeta(None, Some(1L)), null), + Row(2, "bob", 2L, cdcMeta(None, Some(2L)), "b@x.com") + ) + ) + } + + test("narrowing the column selection between runs preserves the dropped column on " + + "existing rows and leaves it NULL on new rows") { + val session = spark + import session.implicits._ + + // Validates the additive-only column-selection contract on the narrowing side: + // tightening `columnSelection` between runs leaves the dropped column in place at the + // schema level (SDP's `SchemaMergingUtils.mergeSchemas` is a union, never a subtraction). + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, email STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, String, String, Long)] + def buildCtx(selection: Option[ColumnSelection]): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", "version")), + keys = Seq("id"), + sequencing = functions.col("version"), + columnSelection = selection + )) + } + + // Run #1: include all columns; populate `email` for key=1. + stream.addData((1, "alice", "a@x.com", 1L)) + runPipeline(buildCtx(selection = None)) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", "a@x.com", 1L, cdcMeta(None, Some(1L)))) + ) + + // Run #2: narrow the selection to drop `email`. The merge omits `email` from both + // INSERT and UPDATE assignment maps; key=1's `email` is preserved at "a@x.com" while + // key=2 is inserted with `email = NULL`. + stream.addData((2, "bob", "ignored", 2L)) + runPipeline(buildCtx(selection = Some(ColumnSelection.IncludeColumns( + Seq("id", "name", "version").map(UnqualifiedColumnName(_)) + )))) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq( + Row(1, "alice", "a@x.com", 1L, cdcMeta(None, Some(1L))), + Row(2, "bob", null, 2L, cdcMeta(None, Some(2L))) + ) + ) + } + + test("a top-level column dropped from the source DF between runs is preserved on " + + "existing rows and left NULL on new rows") { + val session = spark + import session.implicits._ + + // Symmetric to the new-source-column case (which exercises the source DF *gaining* a + // column). Validates that the additive-only column-selection contract holds when the + // narrowing is driven by the source DF's own schema shrinking, rather than by a + // tightening [[ChangeArgs.columnSelection]]. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + // Same `MemoryStream[(Int, String, Option[String], Long)]` shape across runs; runs + // differ in whether `email` is kept in the projected source DF. + val stream = MemoryStream[(Int, String, Option[String], Long)] + def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + val sourceDf = stream.toDF().toDF("id", "name", "email", "version") + val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email") + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(projectedDf), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + + // Run #1: wide source DF (id, name, email, version). mergeSchemas appends `email` to + // the target. + stream.addData((1, "alice", Some("a@x.com"), 1L)) + runPipeline(buildCtx(includeEmail = true)) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)), "a@x.com")) + ) + + // Run #2: source DF drops `email` upstream of the flow. Target still has `email` + // (`StructType.merge` is additive-only); the merge omits `email` from both INSERT and + // UPDATE assignment maps. Key=1's `email` is preserved at "a@x.com"; key=2 is inserted + // with `email = NULL`. + stream.addData((2, "bob", None, 2L)) + runPipeline(buildCtx(includeEmail = false)) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq( + Row(1, "alice", 1L, cdcMeta(None, Some(1L)), "a@x.com"), + Row(2, "bob", 2L, cdcMeta(None, Some(2L)), null) + ) + ) + } + + test("dropping a nested struct field between runs fails with INCOMPATIBLE_DATA_FOR_TABLE") { + val session = spark + import session.implicits._ + + // The v2 writer's column-resolution layer requires every nested target field to be + // present in the microbatch DF. When run #2's source projection drops `b.c`, the merge + // fails with INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA. Users who want to drop a + // nested field between runs must full-refresh the target. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(key INT NOT NULL, version BIGINT NOT NULL, " + + s"value STRUCT>, $cdcMetadataDdl)" + ) + + // Stream is (key, version, a, b_c, b_d). Each run reshapes into different `value` + // shapes; the underlying tuple shape is unchanged so the streaming source's identity + // is stable across runs. + val stream = MemoryStream[(Int, Long, Int, Int, Int)] + def buildCtx(includeC: Boolean): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") + val inner = if (includeC) { + functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) + } else { + functions.struct(functions.col("b_d").as("d")) + } + val projected = src.select( + functions.col("key"), + functions.col("version"), + functions.struct(functions.col("a"), inner.as("b")).as("value") + ) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(projected), + keys = Seq("key"), + sequencing = functions.col("version") + )) + } + + stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2)) + runPipeline(buildCtx(includeC = true)) + + // Run #2 drops b.c. The v2 writer rejects the merge because it cannot find data for + // the target's `value.b.c` column. + stream.addData((1, 2L, 10, 99, 10), (3, 1L, 3, 99, 3)) + val ex = intercept[RuntimeException] { runPipeline(buildCtx(includeC = false)) } + // The V2 writer's `TableOutputResolver` produces this error during plan analysis with + // an empty `tableName` because the merge plan it analyzes does not carry the target's + // catalog identifier through to the resolver call site. + checkErrorInPipelineFailure( + failure = ex, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> "``", + "colName" -> "`value`.`b`.`c`" + ) + ) + } + + test("a new field added inside an array element between runs is added to the " + + "target") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(key INT NOT NULL, version BIGINT NOT NULL, " + + s"vals ARRAY>>, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, Long, Int, Int, Int)] + def buildCtx(includeD: Boolean): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") + val inner = if (includeD) { + functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) + } else { + functions.struct(functions.col("b_c").as("c")) + } + val projected = src.select( + functions.col("key"), + functions.col("version"), + functions.array( + functions.struct(functions.col("a"), inner.as("b")) + ).as("vals") + ) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(projected), + keys = Seq("key"), + sequencing = functions.col("version") + )) + } + + stream.addData((1, 1L, 1, 1, 99)) + runPipeline(buildCtx(includeD = false)) + + // Run #2 widens to include b.d. Existing key=1 row's vals[0].b.d is NULL until the + // upsert at version=2 writes the new value. + stream.addData((1, 2L, 1, 1, 2), (3, 1L, 3, 3, 3)) + runPipeline(buildCtx(includeD = true)) + + // Inline-explode flattens the array for assertion. + checkAnswer( + spark.table(s"$catalog.$namespace.target") + .selectExpr("key", "inline(vals) as (a, b)") + .select("key", "a", "b.c", "b.d"), + Seq( + Row(1, 1, 1, 2), + Row(3, 3, 3, 3) + ) + ) + } + + test("dropping a field inside an array element between runs fails with " + + "INCOMPATIBLE_DATA_FOR_TABLE") { + val session = spark + import session.implicits._ + + // Symmetric to the nested-struct case, but for `array`. The v2 writer rejects + // the merge because it cannot find data for the target's `vals.element.b.d` column + // when run #2's projection drops `d` from the element struct. Users must full-refresh + // the target to drop a nested array-element field. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(key INT NOT NULL, version BIGINT NOT NULL, " + + s"vals ARRAY>>, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, Long, Int, Int, Int)] + def buildCtx(includeD: Boolean): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") + val inner = if (includeD) { + functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) + } else { + functions.struct(functions.col("b_c").as("c")) + } + val projected = src.select( + functions.col("key"), + functions.col("version"), + functions.array( + functions.struct(functions.col("a"), inner.as("b")) + ).as("vals") + ) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(projected), + keys = Seq("key"), + sequencing = functions.col("version") + )) + } + + stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2)) + runPipeline(buildCtx(includeD = true)) + + stream.addData((1, 2L, 10, 10, 99), (3, 1L, 3, 3, 99)) + val ex = intercept[RuntimeException] { runPipeline(buildCtx(includeD = false)) } + // See the nested-struct test above for why `tableName` is empty here. + checkErrorInPipelineFailure( + failure = ex, + condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map( + "tableName" -> "``", + "colName" -> "`vals`.`element`.`b`.`d`" + ) + ) + } + + test("a source DF column whose name differs from the target only by case fails with " + + "AMBIGUOUS_REFERENCE under case-insensitive resolution") { + val session = spark + import session.implicits._ + + // `DatasetManager`'s schema-merge compares the existing target schema and the flow's + // output schema *case-sensitively*: `SchemaMergingUtils.mergeSchemas` calls + // `StructType.merge` without forwarding the session-level case-sensitivity. When the + // target has `value` and the source DF emits `Value`, the merged schema ends up with + // both as separate columns. Reference resolution downstream is case-insensitive + // (Spark's default), so the MERGE plan trips on the duplicate and reports + // AMBIGUOUS_REFERENCE. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(key INT NOT NULL, version BIGINT NOT NULL, value STRING, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, Long, String)] + stream.addData((1, 1L, "alice")) + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + // Source DF emits `Value` (capital), differing only in case from the target's + // `value` column. + val df = stream.toDF().toDF("key", "version", "Value") + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(df), + keys = Seq("key"), + sequencing = functions.col("version") + )) + } + + val ex = intercept[RuntimeException] { runPipeline(ctx) } + // The exact `name` and `referenceNames` parameters depend on internal merge-plan + // synthesis; the condition match is the meaningful invariant for this test. + checkErrorInPipelineFailure( + failure = ex, + condition = "AMBIGUOUS_REFERENCE", + parameters = Map( + "name" -> ".*", + "referenceNames" -> ".*" + ), + matchPVals = true, + queryContext = Array( + ExpectedContext( + fragment = s"`$catalog`.`$namespace`.`target`.`Value`", + start = 0, + stop = 27 + ) + ) + ) + } + } + + test("extra columns on the target that the AutoCDC flow does not emit are preserved " + + "across the merge") { + val session = spark + import session.implicits._ + + // The target is wider than the AutoCDC flow's source DF: column `extra` is present on + // the target but never produced by the flow. AutoCDC must tolerate the extra target + // column -- pre-existing rows keep their `extra` value, and newly-inserted rows + // resolve `extra` to NULL. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, extra INT, $cdcMetadataDdl)" + ) + insertPreloadedRow( + s"$catalog.$namespace.target", + colValues = "1, 'preloaded', 0, 42", + sequence = 0L + ) + + val stream = MemoryStream[(Int, String, Long)] + stream.addData((1, "alice", 1L), (2, "bob", 1L)) + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx) + + checkAnswer( + spark.table(s"$catalog.$namespace.target").select("id", "name", "version", "extra"), + Seq( + Row(1, "alice", 1L, 42), // extra preserved on the upsert + Row(2, "bob", 1L, null) // extra is NULL for inserts + ) + ) + } + + test("changing a non-key column type from TIMESTAMP to STRING between runs fails with " + + "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") { + val session = spark + import session.implicits._ + + // `mergeSchemas` rejects an incompatible type change between TIMESTAMP and STRING. + // Captured alongside the type-widening / type-narrowing tests; users must full-refresh + // the target to change a column's type. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(key INT NOT NULL, version BIGINT NOT NULL, value TIMESTAMP, $cdcMetadataDdl)" + ) + + val stream1 = MemoryStream[(Int, Long, Timestamp)] + stream1.addData((1, 1L, Timestamp.valueOf("2024-01-01 10:00:00"))) + val ctx1 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream1.toDF().toDF("key", "version", "value")), + keys = Seq("key"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx1) + + // Run #2 emits `value` as STRING. mergeSchemas rejects the type change. + val stream2 = MemoryStream[(Int, Long, String)] + stream2.addData((1, 2L, "2024-01-02 11:00:00")) + val ctx2 = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream2.toDF().toDF("key", "version", "value")), + keys = Seq("key"), + sequencing = functions.col("version") + )) + } + + val ex = intercept[RuntimeException] { runPipeline(ctx2) } + checkErrorInPipelineFailure( + failure = ex, + condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE", + sqlState = Some("42825"), + // `left` is the persisted (run #1) TIMESTAMP type; `right` is run #2's STRING. + parameters = Map( + "left" -> "\"TIMESTAMP\"", + "right" -> "\"STRING\"" + ) + ) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala new file mode 100644 index 0000000000000..f06b8c4615339 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions +import org.apache.spark.sql.pipelines.autocdc.{ + ChangeArgs, + ColumnSelection, + ScdType, + UnqualifiedColumnName +} +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Smoke tests for AutoCDC SCD type 1 flows running within a single pipeline: one + * [[DataflowGraph]] / [[TestPipelineUpdateContext]] executes one or more AutoCDC flows, + * and the target table contents are asserted at the end. Multi-pipeline scenarios (where + * multiple pipelines write to the same target) live in [[AutoCdcScd1MultiPipelineSuite]]. + */ +class AutoCdcScd1SinglePipelineSuite + extends ExecutionTest + with SharedSparkSession + with AutoCdcGraphExecutionTestMixin { + + test("an upsert event lands a new row in an empty target table") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, String, Long)] + stream.addData((1, "alice", 1L)) + + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + + runPipeline(ctx) + + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)))) + ) + } + + test("consecutive upsert, delete, and re-upsert events for the same key in one run " + + "converge to the latest event") { + val session = spark + import session.implicits._ + + // Target schema deliberately omits `is_delete`: the source carries it as a control + // column, drives the deleteCondition, and is excluded from the target projection. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, String, Long, Boolean)] + stream.addData( + (1, "alice", 1L, false), // initial upsert + (1, "alice", 2L, true), // delete + (1, "alice2", 3L, false) // reinsert + ) + + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version", "is_delete")), + keys = Seq("id"), + sequencing = functions.col("version"), + deleteCondition = Some(functions.col("is_delete") === true), + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("is_delete")) + )) + )) + } + + runPipeline(ctx) + + // After all three events at seqs 1, 2, 3: row "alice2" wins as the highest-sequenced + // upsert; the delete at seq=2 is superseded by the seq=3 upsert. + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice2", 3L, cdcMeta(None, Some(3L)))) + ) + } + + test("two AutoCDC flows targeting separate tables in one pipeline produce independent " + + "results") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.t_a " + + s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + spark.sql( + s"CREATE TABLE $catalog.$namespace.t_b " + + s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val streamA = MemoryStream[(Int, Long)] + val streamB = MemoryStream[(Int, Long)] + streamA.addData((1, 1L), (2, 1L)) + streamB.addData((10, 1L)) + + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("t_a", catalog = Some(catalog), database = Some(namespace)) + registerTable("t_b", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "flow_a", + target = "t_a", + query = dfFlowFunc(streamA.toDF().toDF("id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + registerFlow(autoCdcFlow( + name = "flow_b", + target = "t_b", + query = dfFlowFunc(streamB.toDF().toDF("id", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx) + + checkAnswer( + spark.table(s"$catalog.$namespace.t_a"), + Seq(Row(1, 1L, cdcMeta(None, Some(1L))), Row(2, 1L, cdcMeta(None, Some(1L)))) + ) + checkAnswer( + spark.table(s"$catalog.$namespace.t_b"), + Seq(Row(10, 1L, cdcMeta(None, Some(1L)))) + ) + assert(spark.catalog.tableExists(auxTableNameFor("t_a"))) + assert(spark.catalog.tableExists(auxTableNameFor("t_b"))) + } + + test("an AutoCDC flow targeting a table whose format does not support row-level " + + "operations fails with AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE") { + val session = spark + import session.implicits._ + + // Intentionally use a non-merge-compatible catalog, whose default table format is parquet. + val catalog = TestGraphRegistrationContext.DEFAULT_CATALOG + val database = TestGraphRegistrationContext.DEFAULT_DATABASE + + spark.sql( + s"CREATE TABLE $catalog.$database.target_no_merge " + + s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + + val stream = MemoryStream[(Int, Long)] + stream.addData((1, 1L)) + + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target_no_merge") + registerFlow(AutoCdcFlow( + identifier = fullyQualifiedIdentifier("auto_cdc_flow"), + destinationIdentifier = fullyQualifiedIdentifier("target_no_merge"), + func = dfFlowFunc(stream.toDF().toDF("id", "version")), + queryContext = QueryContext( + currentCatalog = Some(catalog), + currentDatabase = Some(database) + ), + origin = QueryOrigin.empty, + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = functions.col("version"), + storedAsScdType = ScdType.Type1 + ) + )) + } + + val ex = intercept[RuntimeException] { runPipeline(ctx) } + checkErrorInPipelineFailure( + failure = ex, + condition = "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE", + sqlState = Some("0A000"), + parameters = Map( + "tableName" -> s"`$catalog`.`$database`.`target_no_merge`", + "format" -> "parquet" + ) + ) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala new file mode 100644 index 0000000000000..46f8ee47db02f --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions +import org.apache.spark.sql.pipelines.autocdc.Scd1BatchProcessor +import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests covering AutoCDC's behavior when the target table is pre-populated by something + * other than a prior AutoCDC run: pre-loaded rows, missing CDC metadata column on the + * target, and rows with NULL CDC metadata. These cases verify that AutoCDC interoperates + * gracefully with users who hand-populate the target table. + */ +class AutoCdcScd1TargetTableDurabilitySuite + extends ExecutionTest + with SharedSparkSession + with AutoCdcGraphExecutionTestMixin { + + test("pre-loaded rows: an event with a lower sequence is suppressed and a higher one " + + "wins") { + val session = spark + import session.implicits._ + + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + insertPreloadedRow(s"$catalog.$namespace.target", "1, 'alice', 5", 5L) + insertPreloadedRow(s"$catalog.$namespace.target", "2, 'bob', 5", 5L) + + val stream = MemoryStream[(Int, String, Long)] + stream.addData( + (1, "stale", 2L), // < pre-existing seq=5 -> ignored + (2, "bob2", 10L) // > pre-existing seq=5 -> upserts + ) + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx) + + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq( + Row(1, "alice", 5L, cdcMeta(None, Some(5L))), + Row(2, "bob2", 10L, cdcMeta(None, Some(10L))) + ) + ) + } + + test("pre-loaded target rows merge correctly on the first AutoCDC run, and the " + + "auxiliary table is created lazily") { + val session = spark + import session.implicits._ + + // Target was populated by some external process; this is the first AutoCDC run. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, $cdcMetadataDdl)" + ) + insertPreloadedRow(s"$catalog.$namespace.target", "1, 'alice', 1", 1L) + + assert( + !spark.catalog.tableExists(auxTableNameFor("target")), + "Auxiliary table should not exist before the first AutoCDC run" + ) + + val stream = MemoryStream[(Int, String, Long)] + stream.addData((1, "bob", 2L)) + + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx) + + // seq=2 > pre-existing seq=1, so "bob" replaces "alice" via the upsert sequence column. + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "bob", 2L, cdcMeta(None, Some(2L)))) + ) + assert( + spark.catalog.tableExists(auxTableNameFor("target")), + "Auxiliary table should be created lazily on the first AutoCDC run" + ) + } + + test("a target table created without the CDC metadata column gets the column " + + "auto-added on the first AutoCDC run") { + val session = spark + import session.implicits._ + + // User creates the target without the AutoCDC metadata column. DatasetManager evolves + // the existing table schema by merging it with the AutoCdcMergeFlow's output schema, + // which includes the metadata column. The first run therefore proceeds normally, and + // subsequent reads see the metadata struct alongside the user's data columns. + spark.sql( + s"CREATE TABLE $catalog.$namespace.target " + + s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL)" + ) + + val stream = MemoryStream[(Int, String, Long)] + stream.addData((1, "alice", 1L)) + + val ctx = new TestGraphRegistrationContext(spark) { + registerTable("target", catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = "auto_cdc_flow", + target = "target", + query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), + keys = Seq("id"), + sequencing = functions.col("version") + )) + } + runPipeline(ctx) + + val schema = spark.table(s"$catalog.$namespace.target").schema + assert( + schema.fieldNames.contains(Scd1BatchProcessor.cdcMetadataColName), + s"Target must have ${Scd1BatchProcessor.cdcMetadataColName} after first AutoCDC run; " + + s"got ${schema.fieldNames.toSeq}" + ) + checkAnswer( + spark.table(s"$catalog.$namespace.target"), + Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)))) + ) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala index f19fed4e57806..8dad5019c0fe0 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala @@ -660,7 +660,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with SharedSparkSession { ) { // Temporary views in SDP normally accept either streaming or batch producing flows, but // AutoCDC flows are an explicit exception: SCD reconciliation only runs at the - // streaming-table sink (`Scd1ForeachBatchExec`), so pointing an AutoCDC flow at a view + // streaming-table sink (`Scd1ForeachBatchHandler`), so pointing an AutoCDC flow at a view // would silently drop reconciliation and expose just the projected CDF to consumers. // `validateFlowStreamingness` rejects this case with a dedicated sub-condition under // INVALID_FLOW_QUERY_TYPE.