From 63796ff87f868a87b299cf8a5468257787756f1a Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Tue, 28 Apr 2026 08:06:53 +0000 Subject: [PATCH 1/4] First working netChanges implementation --- .../analysis/ResolveChangelogTable.scala | 130 +++++- ...ResolveChangelogTableNetChangesSuite.scala | 373 ++++++++++++++++++ 2 files changed, 494 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index bdf9b9fed09cc..95b9af770fa84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -18,7 +18,13 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Max, Min} +import org.apache.spark.sql.catalyst.expressions.aggregate.{ + Count, + First, + Last, + Max, + Min +} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 @@ -63,6 +69,21 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv, RvCnt) } + /** + * Reserved (`__spark_cdc_*`) column names used internally by net-change + * computation; connectors must not emit columns with these names. + */ + object NetChangesHelperColumns { + final val RowNumber = "__spark_cdc_row_number" + final val RowCount = "__spark_cdc_row_count" + final val FirstRowChangeTypeValue = + "__spark_cdc_first_row_change_type_value" + final val LastRowChangeTypeValue = "__spark_cdc_last_row_change_type_value" + + val all: Set[String] = + Set(RowNumber, RowCount, FirstRowChangeTypeValue, LastRowChangeTypeValue) + } + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) if !table.resolved => val changelog = table.changelog @@ -75,7 +96,8 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { resolvedRel, changelog, req.requiresCarryOverRemoval, req.requiresUpdateDetection) } if (req.requiresNetChanges) { - updatedRel = injectNetChangeComputation(updatedRel, changelog) + updatedRel = injectNetChangeComputation( + updatedRel, changelog, table.changelogInfo.computeUpdates()) } updatedRel @@ -119,11 +141,6 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { private def evaluateRequirements( changelog: Changelog, options: ChangelogInfo): PostProcessingRequirements = { - // Net change computation is not yet implemented. - if (options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES) { - throw QueryCompilationErrors.cdcNetChangesNotYetSupported(changelog.name()) - } - val requiresCarryOverRemoval = options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && changelog.containsCarryoverRows() @@ -282,8 +299,103 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { */ private def injectNetChangeComputation( plan: LogicalPlan, - cl: Changelog): LogicalPlan = { - plan + cl: Changelog, + computeUpdates: Boolean): LogicalPlan = { + val changeTypeAttr = getAttribute(plan, "_change_type") + val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan) + val commitVersionAttr = getAttribute(plan, "_commit_version") + val changeTypeRank = CaseWhen(Seq( + EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)) -> Literal(0), + EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) -> Literal(0), + EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) -> Literal(1), + EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)) -> Literal(1)), + Literal(2)) + val partitionByCols = rowIdExprs + val orderSpec = Seq( + SortOrder(commitVersionAttr, Ascending), + SortOrder(changeTypeRank, Ascending)) + val rowNumberWindowSpec = WindowSpecDefinition( + partitionByCols, orderSpec, + UnspecifiedFrame) + val aggregateWindowSpec = WindowSpecDefinition( + partitionByCols, orderSpec, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) + + val rowNumberAlias = Alias( + WindowExpression(RowNumber(), rowNumberWindowSpec), + NetChangesHelperColumns.RowNumber)() + val rowCountAlias = Alias( + WindowExpression(Count(Seq(Literal(1))).toAggregateExpression(), aggregateWindowSpec), + NetChangesHelperColumns.RowCount)() + val firstRowChangeTypeValueAlias = Alias( + WindowExpression( + First(changeTypeAttr, ignoreNulls = false).toAggregateExpression(), + aggregateWindowSpec), + NetChangesHelperColumns.FirstRowChangeTypeValue)() + val lastRowChangeTypeValueAlias = Alias( + WindowExpression( + Last(changeTypeAttr, ignoreNulls = false).toAggregateExpression(), + aggregateWindowSpec), + NetChangesHelperColumns.LastRowChangeTypeValue)() + + val windowedPlan = Window( + Seq(rowNumberAlias, rowCountAlias, firstRowChangeTypeValueAlias, + lastRowChangeTypeValueAlias), + partitionByCols, orderSpec, plan) + + val rowNumberAttr = getAttribute(windowedPlan, NetChangesHelperColumns.RowNumber) + val rowCountAttr = getAttribute(windowedPlan, NetChangesHelperColumns.RowCount) + val firstRowChangeTypeAttr = + getAttribute(windowedPlan, NetChangesHelperColumns.FirstRowChangeTypeValue) + val lastRowChangeTypeAttr = + getAttribute(windowedPlan, NetChangesHelperColumns.LastRowChangeTypeValue) + + val existedBeforeVersionRange = In(firstRowChangeTypeAttr, Seq( + Literal(Changelog.CHANGE_TYPE_DELETE), + Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE))) + val existsAfterVersionRange = In(lastRowChangeTypeAttr, Seq( + Literal(Changelog.CHANGE_TYPE_INSERT), + Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE))) + + val isFirst = EqualTo(rowNumberAttr, Literal(1)) + val isLast = EqualTo(rowNumberAttr, rowCountAttr) + + // only keep first and last entry per set of changes for a rowId, order of cases is important! + val keep = CaseWhen(Seq( + // filter out if inserted and deleted within range + And(Not(existedBeforeVersionRange), Not(existsAfterVersionRange)) -> Literal(false), + // for persisting new row keep only last state + And(Not(existedBeforeVersionRange), existsAfterVersionRange) -> isLast, + // for previously existing row keep first state + And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst, + // for persisting row keep first and last state + And(existedBeforeVersionRange, existsAfterVersionRange) -> Or(isFirst, isLast)), + Literal(false)) // dont keep row by default + + val filteredPlan = Filter(keep, windowedPlan) + + val targetPreLabel = + if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE) + else Literal(Changelog.CHANGE_TYPE_DELETE) + val targetPostLabel = + if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE) + else Literal(Changelog.CHANGE_TYPE_INSERT) + + val relabel = CaseWhen(Seq( + And(Not(existedBeforeVersionRange), isLast) -> Literal(Changelog.CHANGE_TYPE_INSERT), + And(Not(existsAfterVersionRange), isFirst) -> Literal(Changelog.CHANGE_TYPE_DELETE), + And(And(existedBeforeVersionRange, existsAfterVersionRange), isFirst) -> targetPreLabel, + And(And(existedBeforeVersionRange, existsAfterVersionRange), isLast) -> targetPostLabel), + changeTypeAttr) + + val projectList = filteredPlan.output.flatMap { attr => + if (NetChangesHelperColumns.all.contains(attr.name)) None + else if (attr.name == "_change_type") Some(Alias(relabel, "_change_type")()) + else Some(attr) + } + + val projectedPlan = Project(projectList, filteredPlan) + projectedPlan } // --------------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala new file mode 100644 index 0000000000000..6976d12208f1f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util.Collections + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{ + ChangelogProperties, Column, Identifier, InMemoryChangelogCatalog} +import org.apache.spark.sql.connector.catalog.Changelog.{ + CHANGE_TYPE_DELETE, CHANGE_TYPE_INSERT, CHANGE_TYPE_UPDATE_POSTIMAGE, + CHANGE_TYPE_UPDATE_PREIMAGE} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + +/** + * Tests for the `netChanges` deduplication mode handled by + * [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]]. + * + * Setup convention: every test runs against an in-memory connector configured with + * `containsIntermediateChanges = true` and `representsUpdateAsDeleteAndInsert = false`, + * which means + * - netChanges is enabled (the only post-processing pass under test); + * - update detection is disabled (so the test directly controls the change_type + * labels reaching netChanges). + * + * Coverage: + * - Single-event tests: a lone `insert` or `delete` survives netChanges unchanged + * across a few version-range shapes (covers the `cnt=1` partitions of the + * `(insert, insert)` and `(delete, delete)` matrix cells). + * - Full matrix: all 9 `(first_change_type, last_change_type)` cells from the + * design plan, parameterised over `computeUpdates in {true, false}`. Some cells + * need 3 or 4 events to construct a partition with that first/last shape. + * + * The `(yes, yes)` cells follow the SPIP B.8 / SQL Ref Spec footnote: under + * `cu=true` the two emitted rows are labelled `update_preimage + update_postimage`, + * under `cu=false` they are labelled `delete + insert` -- regardless of which input + * labels the partition started with. + */ +class ResolveChangelogTableNetChangesSuite + extends QueryTest + with SharedSparkSession + with BeforeAndAfterEach { + + private val catalogName = "cdc_netchanges_catalog" + private val testTableName = "events" + + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set( + s"spark.sql.catalog.$catalogName", + classOf[InMemoryChangelogCatalog].getName) + } + + override def beforeEach(): Unit = { + super.beforeEach() + val cat = catalog + if (cat.tableExists(ident)) cat.dropTable(ident) + cat.clearChangeRows(ident) + cat.createTable( + ident, + Array( + Column.create("id", LongType), + Column.create("name", StringType), + Column.create("row_commit_version", LongType, false)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + cat.setChangelogProperties(ident, ChangelogProperties( + containsIntermediateChanges = true, + containsCarryoverRows = false, + representsUpdateAsDeleteAndInsert = false, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + } + + private def catalog: InMemoryChangelogCatalog = + spark.sessionState.catalogManager + .catalog(catalogName) + .asInstanceOf[InMemoryChangelogCatalog] + + private def ident = Identifier.of(Array.empty, testTableName) + + // --------------------------------------------------------------------------- + // Input helpers + // --------------------------------------------------------------------------- + + /** + * Builds a single change row matching the table schema + * `(id, name, row_commit_version, _change_type, _commit_version, _commit_timestamp)`. + * `row_commit_version` is set to `commitVersion`; these tests do not exercise + * carry-over removal, so the rcv value does not matter for assertions. + */ + private def event( + id: Long, name: String, changeType: String, commitVersion: Long): InternalRow = { + InternalRow( + id, + UTF8String.fromString(name), + commitVersion, + UTF8String.fromString(changeType), + commitVersion, + 0L) + } + + private def addInsert(commitVersion: Long, id: Long, name: String): Unit = + catalog.addChangeRows(ident, Seq(event(id, name, CHANGE_TYPE_INSERT, commitVersion))) + + private def addDelete(commitVersion: Long, id: Long, name: String): Unit = + catalog.addChangeRows(ident, Seq(event(id, name, CHANGE_TYPE_DELETE, commitVersion))) + + private def addUpdatePre(commitVersion: Long, id: Long, name: String): Unit = + catalog.addChangeRows( + ident, Seq(event(id, name, CHANGE_TYPE_UPDATE_PREIMAGE, commitVersion))) + + private def addUpdatePost(commitVersion: Long, id: Long, name: String): Unit = + catalog.addChangeRows( + ident, Seq(event(id, name, CHANGE_TYPE_UPDATE_POSTIMAGE, commitVersion))) + + // --------------------------------------------------------------------------- + // Expected output helpers + // --------------------------------------------------------------------------- + + private def expectInsert(version: Long, id: Long, name: String): Row = + Row(id, name, CHANGE_TYPE_INSERT, version) + + private def expectDelete(version: Long, id: Long, name: String): Row = + Row(id, name, CHANGE_TYPE_DELETE, version) + + private def expectUpdatePre(version: Long, id: Long, name: String): Row = + Row(id, name, CHANGE_TYPE_UPDATE_PREIMAGE, version) + + private def expectUpdatePost(version: Long, id: Long, name: String): Row = + Row(id, name, CHANGE_TYPE_UPDATE_POSTIMAGE, version) + + /** + * Expected two-row output for an `UPDATE(s)` collapse: the partition's first row + * (representing the row's state when it entered the range) plus the last row + * (state when it exits). Output labels follow the SQL Ref Spec footnote: + * `update_preimage + update_postimage` under `cu=true`, `delete + insert` under + * `cu=false`. + */ + private def outputForUpdate( + cu: Boolean, + id: Long, + preV: Long, oldName: String, + postV: Long, newName: String): Seq[Row] = { + if (cu) { + Seq(expectUpdatePre(preV, id, oldName), expectUpdatePost(postV, id, newName)) + } else { + Seq(expectDelete(preV, id, oldName), expectInsert(postV, id, newName)) + } + } + + // --------------------------------------------------------------------------- + // Query helper + // --------------------------------------------------------------------------- + + private def runNetChanges( + fromV: Long, toV: Long, computeUpdates: Boolean = false): DataFrame = + sql( + s"SELECT id, name, _change_type, _commit_version " + + s"FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION $fromV TO VERSION $toV " + + s"WITH (deduplicationMode = 'netChanges', computeUpdates = '$computeUpdates')") + + // =========================================================================== + // Single-event: a lone insert or delete passes through netChanges + // =========================================================================== + // + // Parameterised over `change_type` in {insert, delete} and three range shapes: + // wide range, single-version range, and `cu=true` to confirm the lone event + // is mode-independent. + + private val singleEventCases: Seq[(String, (Long => Unit), (Long => Row))] = Seq( + ("insert", + (v: Long) => addInsert(v, 1L, "Alice"), + (v: Long) => expectInsert(v, 1L, "Alice")), + ("delete", + (v: Long) => addDelete(v, 1L, "Alice"), + (v: Long) => expectDelete(v, 1L, "Alice"))) + + singleEventCases.foreach { case (label, addFn, expectFn) => + test(s"single $label survives netChanges (wide range FROM 1 TO 10)") { + addFn(3L) + checkAnswer(runNetChanges(fromV = 1, toV = 10), Seq(expectFn(3L))) + } + + test(s"single $label survives netChanges (single-version range FROM 3 TO 3)") { + addFn(3L) + checkAnswer(runNetChanges(fromV = 3, toV = 3), Seq(expectFn(3L))) + } + + test(s"single $label survives netChanges (cu=true wide range)") { + addFn(3L) + checkAnswer( + runNetChanges(fromV = 1, toV = 10, computeUpdates = true), + Seq(expectFn(3L))) + } + } + + // =========================================================================== + // Full matrix: all 9 (first_change_type, last_change_type) cells + // =========================================================================== + // + // For each cell, `setup` adds the events that produce a partition with the + // declared first/last change_type pair, and `expected(cu)` returns the rows + // the netChanges output should contain. Some cells need more than two events + // to construct (e.g. `(insert, update_postimage)` requires an insert plus an + // update pre/post pair, i.e. 3 events). + + private case class MatrixCase( + label: String, + setup: () => Unit, + expected: Boolean => Seq[Row]) + + private val matrixCases: Seq[MatrixCase] = Seq( + + // (insert, delete): row inserted and deleted in the range. Cancels. + MatrixCase( + label = "(insert, delete) cancels out", + setup = () => { + addInsert(commitVersion = 2, id = 1L, name = "Alice") + addDelete(commitVersion = 5, id = 1L, name = "Alice") + }, + expected = (_: Boolean) => Seq.empty[Row]), + + // (insert, insert) with cnt > 1: insert + delete + re-insert; net is the + // re-insert with the latest values. Mode-independent (label always insert). + MatrixCase( + label = "(insert, insert) emits the last insert", + setup = () => { + addInsert(commitVersion = 2, id = 1L, name = "Alice") + addDelete(commitVersion = 3, id = 1L, name = "Alice") + addInsert(commitVersion = 5, id = 1L, name = "Alice_v2") + }, + expected = (_: Boolean) => Seq(expectInsert(5L, 1L, "Alice_v2"))), + + // (insert, update_postimage): inserted, then updated. Last event is the + // update post; output is one row labelled `insert` with the post values. + // Mode-independent (label always insert). + MatrixCase( + label = "(insert, update_post) emits last as insert", + setup = () => { + addInsert(commitVersion = 2, id = 1L, name = "Alice") + addUpdatePre(commitVersion = 5, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 5, id = 1L, name = "Alice_v2") + }, + expected = (_: Boolean) => Seq(expectInsert(5L, 1L, "Alice_v2"))), + + // (update_pre, update_post): pure UPDATE(s); 2 rows mode-dependent. + MatrixCase( + label = "(update_pre, update_post) emits PRE + POST", + setup = () => { + addUpdatePre(commitVersion = 3, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 3, id = 1L, name = "Alice_v2") + }, + expected = (cu: Boolean) => + outputForUpdate(cu, 1L, preV = 3, "Alice", postV = 3, "Alice_v2")), + + // (update_pre, insert): pre-existed (via update), exists at end (via re-insert + // after delete). 2 rows mode-dependent. Lifecycle: + // update_pre(2), update_post(2), delete(3), insert(5). + MatrixCase( + label = "(update_pre, insert) emits PRE + POST", + setup = () => { + addUpdatePre(commitVersion = 2, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 2, id = 1L, name = "Alice_v2") + addDelete(commitVersion = 3, id = 1L, name = "Alice_v2") + addInsert(commitVersion = 5, id = 1L, name = "Alice_resurrected") + }, + expected = (cu: Boolean) => + outputForUpdate(cu, 1L, preV = 2, "Alice", postV = 5, "Alice_resurrected")), + + // (delete, update_post): pre-existed (via delete), exists at end (via update + // post after re-insert). 2 rows mode-dependent. Lifecycle: + // delete(2), insert(3), update_pre(5), update_post(5). + MatrixCase( + label = "(delete, update_post) emits PRE + POST", + setup = () => { + addDelete(commitVersion = 2, id = 1L, name = "Alice") + addInsert(commitVersion = 3, id = 1L, name = "Alice_v2") + addUpdatePre(commitVersion = 5, id = 1L, name = "Alice_v2") + addUpdatePost(commitVersion = 5, id = 1L, name = "Alice_v3") + }, + expected = (cu: Boolean) => + outputForUpdate(cu, 1L, preV = 2, "Alice", postV = 5, "Alice_v3")), + + // (delete, insert): pre-existed and exists at end via raw delete + insert + // (no native update labels). 2 rows mode-dependent. + MatrixCase( + label = "(delete, insert) emits PRE + POST", + setup = () => { + addDelete(commitVersion = 2, id = 1L, name = "Alice") + addInsert(commitVersion = 5, id = 1L, name = "Alice_resurrected") + }, + expected = (cu: Boolean) => + outputForUpdate(cu, 1L, preV = 2, "Alice", postV = 5, "Alice_resurrected")), + + // (update_pre, delete): pre-existed (via update), gone at end. 1 row labelled + // `delete` with the values from the FIRST update_pre. Mode-independent (delete). + MatrixCase( + label = "(update_pre, delete) emits first as delete", + setup = () => { + addUpdatePre(commitVersion = 3, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 3, id = 1L, name = "Alice_v2") + addDelete(commitVersion = 5, id = 1L, name = "Alice_v2") + }, + expected = (_: Boolean) => Seq(expectDelete(3L, 1L, "Alice"))), + + // (delete, delete) with cnt > 1: delete + re-insert + delete; net is the + // first delete (the row's state at range entry). Mode-independent (delete). + MatrixCase( + label = "(delete, delete) emits the first delete", + setup = () => { + addDelete(commitVersion = 2, id = 1L, name = "Alice") + addInsert(commitVersion = 3, id = 1L, name = "Alice_v2") + addDelete(commitVersion = 5, id = 1L, name = "Alice_v2") + }, + expected = (_: Boolean) => Seq(expectDelete(2L, 1L, "Alice")))) + + matrixCases.foreach { mc => + Seq(true, false).foreach { cu => + test(s"matrix: ${mc.label} [cu=$cu]") { + mc.setup() + checkAnswer( + runNetChanges(fromV = 1, toV = 10, computeUpdates = cu), + mc.expected(cu)) + } + } + } + + // =========================================================================== + // Range-narrowing: events outside the requested range must not show up + // =========================================================================== + // + // Verifies the connector-side range filter and netChanges interact correctly: + // narrow ranges drop events that would otherwise appear, even if those events + // are part of the rowId's lifecycle outside the range. + + test("range narrowing: only events inside [from, to] reach netChanges") { + // Lifecycle: insert(v2) -- update_pre/post(v5) -- delete(v8). The narrow + // query [v3, v6] should see only the update at v5, which collapses to a + // single PRE + POST pair. + addInsert(commitVersion = 2, id = 1L, name = "Alice") + addUpdatePre(commitVersion = 5, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 5, id = 1L, name = "Alice_v2") + addDelete(commitVersion = 8, id = 1L, name = "Alice_v2") + + checkAnswer( + runNetChanges(fromV = 3, toV = 6, computeUpdates = true), + outputForUpdate( + cu = true, id = 1L, preV = 5, oldName = "Alice", postV = 5, newName = "Alice_v2")) + } +} From 746f37cea0363fd8f1e9db8ed9fe6cdaac962d04 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Tue, 28 Apr 2026 12:40:51 +0000 Subject: [PATCH 2/4] Simplified test suite, refactored netChanges, javadocs --- .../analysis/ResolveChangelogTable.scala | 98 +++- ...ResolveChangelogTableNetChangesSuite.scala | 451 ++++++++++-------- ...lveChangelogTablePostProcessingSuite.scala | 39 -- 3 files changed, 322 insertions(+), 266 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index 95b9af770fa84..acbf67a316410 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -35,7 +35,8 @@ import org.apache.spark.sql.types.{IntegerType, StringType} /** * Post-processes a resolved [[ChangelogTable]] read to apply CDC option semantics - * (carry-over removal, update detection) and to enforce supported option combinations. + * (carry-over removal, update detection, net change computation) and to enforce + * supported option combinations. * * Fires after [[ResolveRelations]] has wrapped the connector's [[Changelog]] in a * [[ChangelogTable]]. Both batch ([[DataSourceV2Relation]]) and streaming @@ -44,14 +45,13 @@ import org.apache.spark.sql.types.{IntegerType, StringType} * of the relation. Carry-over removal and update detection are fused into a single * pass over a (rowId, _commit_version)-partitioned Window: the Filter drops CoW * carry-over pairs (same rowVersion on both sides) and the subsequent Project relabels - * real delete+insert pairs as update_preimage / update_postimage. + * real delete+insert pairs as update_preimage / update_postimage. Net change + * computation runs on top of that, collapsing intermediate states per `rowId` per + * the SPIP `Deduplication Semantics`. * - Streaming: post-processing is not yet supported. If the requested options would * require any post-processing, the rule throws an explicit [[AnalysisException]] to * prevent silent wrong results. Streams that don't require post-processing pass * through unchanged. - * - * Net change computation (`deduplicationMode = netChanges`) is not yet implemented and - * is rejected up-front for both batch and streaming. */ object ResolveChangelogTable extends Rule[LogicalPlan] { @@ -135,8 +135,8 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { /** * Validates CDC option/capability combinations and computes which post-processing * passes are required. Throws an [[org.apache.spark.sql.AnalysisException]] for - * unsupported or contradictory combinations (currently: `netChanges` deduplication, - * and `computeUpdates` with surfaced carry-overs but no carry-over removal). + * unsupported or contradictory combinations (currently: `computeUpdates` with + * surfaced carry-overs but no carry-over removal). */ private def evaluateRequirements( changelog: Changelog, @@ -294,13 +294,48 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { // --------------------------------------------------------------------------- /** - * Collapses multiple changes per row identity into the net effect. - * Not yet implemented. + * Collapses multiple changes per row identity into the net effect: + * + * | existedBefore | existsAfter | output | + * |---------------|-------------|-------------------------------------| + * | false | false | (cancel) | + * | false | true | insert | + * | true | false | delete | + * | true | true | update_preimage + update_postimage | + * + * If `computeUpdates = false`, the `update_preimage` + `update_postimage` pair is + * emitted as `delete` + `insert` instead. + * + * `existedBefore` is true iff the partition's first event is `delete` or + * `update_preimage`. `existsAfter` is true iff the partition's last event is + * `insert` or `update_postimage`. + * + * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per partition) + * -> Project (relabel `_change_type` and drop helper columns). */ private def injectNetChangeComputation( plan: LogicalPlan, cl: Changelog, computeUpdates: Boolean): LogicalPlan = { + val windowedPlan = addNetChangesWindow(plan, cl) + val filteredAndRelabeledPlan = + removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan, computeUpdates) + filteredAndRelabeledPlan + } + + /** + * Adds a Window node partitioned by `rowId` and ordered by + * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`, + * `delete`) sort before post-events (`update_postimage`, `insert`) within the same + * commit. Computes per-partition helper columns: + * - `__spark_cdc_row_number` (1..n) answers: "is this the first or last row?". + * - `__spark_cdc_row_count` is the partition size which combined with row_number is + * used to detect the last row. + * - `__spark_cdc_first_row_change_type_value` and + * `__spark_cdc_last_row_change_type_value` drive the first/last classification at + * filter and relabel time. + */ + private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog): LogicalPlan = { val changeTypeAttr = getAttribute(plan, "_change_type") val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan) val commitVersionAttr = getAttribute(plan, "_commit_version") @@ -318,8 +353,8 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { partitionByCols, orderSpec, UnspecifiedFrame) val aggregateWindowSpec = WindowSpecDefinition( - partitionByCols, orderSpec, - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) + partitionByCols, orderSpec, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) val rowNumberAlias = Alias( WindowExpression(RowNumber(), rowNumberWindowSpec), @@ -338,11 +373,38 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { aggregateWindowSpec), NetChangesHelperColumns.LastRowChangeTypeValue)() - val windowedPlan = Window( + Window( Seq(rowNumberAlias, rowCountAlias, firstRowChangeTypeValueAlias, lastRowChangeTypeValueAlias), partitionByCols, orderSpec, plan) + } + /** + * Filters and relabels the windowed plan: keeps only the first and/or last row per + * `rowId` partition, then rewrites the surviving rows' `_change_type` and drops the + * helper columns. + * + * | existedBefore | existsAfter | output | + * |---------------|-------------|-------------------------------------| + * | false | false | (cancel) | + * | false | true | insert | + * | true | false | delete | + * | true | true | update_preimage + update_postimage | + * + * If `computeUpdates = false`, the `update_preimage` + `update_postimage` pair is + * emitted as `delete` + `insert` instead. + * + * `existedBefore` is true iff the partition's first event is `delete` or + * `update_preimage`. `existsAfter` is true iff the partition's last event is + * `insert` or `update_postimage`. + * + * Helper columns (`__spark_cdc_*`) are dropped in the same Project that does the + * relabel, saving a follow-up cleanup pass. + */ + private def removeIntermediateChangelogEntriesAndRelabelChangeTypes( + windowedPlan: LogicalPlan, + computeUpdates: Boolean + ): LogicalPlan = { val rowNumberAttr = getAttribute(windowedPlan, NetChangesHelperColumns.RowNumber) val rowCountAttr = getAttribute(windowedPlan, NetChangesHelperColumns.RowCount) val firstRowChangeTypeAttr = @@ -374,18 +436,22 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val filteredPlan = Filter(keep, windowedPlan) - val targetPreLabel = + val computedPreUpdateLabel = if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE) else Literal(Changelog.CHANGE_TYPE_DELETE) - val targetPostLabel = + val computedPostUpdateLabel = if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE) else Literal(Changelog.CHANGE_TYPE_INSERT) + val changeTypeAttr = getAttribute(filteredPlan, "_change_type") + val relabel = CaseWhen(Seq( And(Not(existedBeforeVersionRange), isLast) -> Literal(Changelog.CHANGE_TYPE_INSERT), And(Not(existsAfterVersionRange), isFirst) -> Literal(Changelog.CHANGE_TYPE_DELETE), - And(And(existedBeforeVersionRange, existsAfterVersionRange), isFirst) -> targetPreLabel, - And(And(existedBeforeVersionRange, existsAfterVersionRange), isLast) -> targetPostLabel), + And(And(existedBeforeVersionRange, existsAfterVersionRange), isFirst) + -> computedPreUpdateLabel, + And(And(existedBeforeVersionRange, existsAfterVersionRange), isLast) + -> computedPostUpdateLabel), changeTypeAttr) val projectList = filteredPlan.output.flatMap { attr => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala index 6976d12208f1f..b4601a89de880 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala @@ -34,34 +34,33 @@ import org.apache.spark.sql.types.{LongType, StringType} import org.apache.spark.unsafe.types.UTF8String /** - * Tests for the `netChanges` deduplication mode handled by + * Shared test bodies for the `netChanges` deduplication mode handled by * [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]]. * - * Setup convention: every test runs against an in-memory connector configured with - * `containsIntermediateChanges = true` and `representsUpdateAsDeleteAndInsert = false`, - * which means - * - netChanges is enabled (the only post-processing pass under test); - * - update detection is disabled (so the test directly controls the change_type - * labels reaching netChanges). - * - * Coverage: - * - Single-event tests: a lone `insert` or `delete` survives netChanges unchanged - * across a few version-range shapes (covers the `cnt=1` partitions of the - * `(insert, insert)` and `(delete, delete)` matrix cells). - * - Full matrix: all 9 `(first_change_type, last_change_type)` cells from the - * design plan, parameterised over `computeUpdates in {true, false}`. Some cells - * need 3 or 4 events to construct a partition with that first/last shape. + * Concrete subclasses fix the [[computeUpdates]] flag and therefore run the + * entire suite twice (once with `computeUpdates = true`, once with + * `computeUpdates = false`). Test bodies use [[computedPreUpdateLabel]] / + * [[computedPostUpdateLabel]] in their expected outputs. * - * The `(yes, yes)` cells follow the SPIP B.8 / SQL Ref Spec footnote: under - * `cu=true` the two emitted rows are labelled `update_preimage + update_postimage`, - * under `cu=false` they are labelled `delete + insert` -- regardless of which input - * labels the partition started with. + * Setup convention: every test runs against an in-memory connector configured + * with `containsIntermediateChanges = true` and + * `representsUpdateAsDeleteAndInsert = false`, which means + * - netChanges is enabled (the only post-processing pass under test); + * - update detection is disabled (so the test directly controls the + * change_type labels reaching netChanges). */ -class ResolveChangelogTableNetChangesSuite +trait ResolveChangelogTableNetChangesTestsBase extends QueryTest with SharedSparkSession with BeforeAndAfterEach { + /** + * Value of the user-facing CDC option `computeUpdates` that this test run + * exercises. Concrete subclasses pin this to `true` or `false` so the same + * test bodies cover both modes via two suite classes. + */ + protected def computeUpdates: Boolean + private val catalogName = "cdc_netchanges_catalog" private val testTableName = "events" @@ -110,7 +109,7 @@ class ResolveChangelogTableNetChangesSuite * `row_commit_version` is set to `commitVersion`; these tests do not exercise * carry-over removal, so the rcv value does not matter for assertions. */ - private def event( + private def cdcEntry( id: Long, name: String, changeType: String, commitVersion: Long): InternalRow = { InternalRow( id, @@ -122,18 +121,18 @@ class ResolveChangelogTableNetChangesSuite } private def addInsert(commitVersion: Long, id: Long, name: String): Unit = - catalog.addChangeRows(ident, Seq(event(id, name, CHANGE_TYPE_INSERT, commitVersion))) + catalog.addChangeRows(ident, Seq(cdcEntry(id, name, CHANGE_TYPE_INSERT, commitVersion))) private def addDelete(commitVersion: Long, id: Long, name: String): Unit = - catalog.addChangeRows(ident, Seq(event(id, name, CHANGE_TYPE_DELETE, commitVersion))) + catalog.addChangeRows(ident, Seq(cdcEntry(id, name, CHANGE_TYPE_DELETE, commitVersion))) private def addUpdatePre(commitVersion: Long, id: Long, name: String): Unit = catalog.addChangeRows( - ident, Seq(event(id, name, CHANGE_TYPE_UPDATE_PREIMAGE, commitVersion))) + ident, Seq(cdcEntry(id, name, CHANGE_TYPE_UPDATE_PREIMAGE, commitVersion))) private def addUpdatePost(commitVersion: Long, id: Long, name: String): Unit = catalog.addChangeRows( - ident, Seq(event(id, name, CHANGE_TYPE_UPDATE_POSTIMAGE, commitVersion))) + ident, Seq(cdcEntry(id, name, CHANGE_TYPE_UPDATE_POSTIMAGE, commitVersion))) // --------------------------------------------------------------------------- // Expected output helpers @@ -145,37 +144,29 @@ class ResolveChangelogTableNetChangesSuite private def expectDelete(version: Long, id: Long, name: String): Row = Row(id, name, CHANGE_TYPE_DELETE, version) - private def expectUpdatePre(version: Long, id: Long, name: String): Row = - Row(id, name, CHANGE_TYPE_UPDATE_PREIMAGE, version) - - private def expectUpdatePost(version: Long, id: Long, name: String): Row = - Row(id, name, CHANGE_TYPE_UPDATE_POSTIMAGE, version) + /** + * Mode-dependent target label for the FIRST emitted row of a partition where + * `existedBefore=true, existsAfter=true`. Mirrors the production rule's + * `computedPreUpdateLabel` selection: `update_preimage` under + * `computeUpdates = true`, `delete` under `computeUpdates = false`. + */ + private def computedPreUpdateLabel: String = + if (computeUpdates) CHANGE_TYPE_UPDATE_PREIMAGE else CHANGE_TYPE_DELETE /** - * Expected two-row output for an `UPDATE(s)` collapse: the partition's first row - * (representing the row's state when it entered the range) plus the last row - * (state when it exits). Output labels follow the SQL Ref Spec footnote: - * `update_preimage + update_postimage` under `cu=true`, `delete + insert` under - * `cu=false`. + * Mode-dependent target label for the LAST emitted row of a partition where + * `existedBefore=true, existsAfter=true`. Mirrors the production rule's + * `computedPostUpdateLabel` selection: `update_postimage` under + * `computeUpdates = true`, `insert` under `computeUpdates = false`. */ - private def outputForUpdate( - cu: Boolean, - id: Long, - preV: Long, oldName: String, - postV: Long, newName: String): Seq[Row] = { - if (cu) { - Seq(expectUpdatePre(preV, id, oldName), expectUpdatePost(postV, id, newName)) - } else { - Seq(expectDelete(preV, id, oldName), expectInsert(postV, id, newName)) - } - } + private def computedPostUpdateLabel: String = + if (computeUpdates) CHANGE_TYPE_UPDATE_POSTIMAGE else CHANGE_TYPE_INSERT // --------------------------------------------------------------------------- // Query helper // --------------------------------------------------------------------------- - private def runNetChanges( - fromV: Long, toV: Long, computeUpdates: Boolean = false): DataFrame = + private def runNetChanges(fromV: Long, toV: Long): DataFrame = sql( s"SELECT id, name, _change_type, _commit_version " + s"FROM $catalogName.$testTableName " + @@ -185,176 +176,138 @@ class ResolveChangelogTableNetChangesSuite // =========================================================================== // Single-event: a lone insert or delete passes through netChanges // =========================================================================== - // - // Parameterised over `change_type` in {insert, delete} and three range shapes: - // wide range, single-version range, and `cu=true` to confirm the lone event - // is mode-independent. - - private val singleEventCases: Seq[(String, (Long => Unit), (Long => Row))] = Seq( - ("insert", - (v: Long) => addInsert(v, 1L, "Alice"), - (v: Long) => expectInsert(v, 1L, "Alice")), - ("delete", - (v: Long) => addDelete(v, 1L, "Alice"), - (v: Long) => expectDelete(v, 1L, "Alice"))) - - singleEventCases.foreach { case (label, addFn, expectFn) => - test(s"single $label survives netChanges (wide range FROM 1 TO 10)") { - addFn(3L) - checkAnswer(runNetChanges(fromV = 1, toV = 10), Seq(expectFn(3L))) - } - - test(s"single $label survives netChanges (single-version range FROM 3 TO 3)") { - addFn(3L) - checkAnswer(runNetChanges(fromV = 3, toV = 3), Seq(expectFn(3L))) - } - - test(s"single $label survives netChanges (cu=true wide range)") { - addFn(3L) - checkAnswer( - runNetChanges(fromV = 1, toV = 10, computeUpdates = true), - Seq(expectFn(3L))) - } + + test("single insert survives netChanges (wide range FROM 1 TO 10)") { + addInsert(commitVersion = 3L, id = 1L, name = "Alice") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq(expectInsert(3L, 1L, "Alice"))) + } + + test("single insert survives netChanges (single-version range FROM 3 TO 3)") { + addInsert(commitVersion = 3L, id = 1L, name = "Alice") + checkAnswer( + runNetChanges(fromV = 3, toV = 3), + Seq(expectInsert(3L, 1L, "Alice"))) + } + + test("single delete survives netChanges (wide range FROM 1 TO 10)") { + addDelete(commitVersion = 3L, id = 1L, name = "Alice") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq(expectDelete(3L, 1L, "Alice"))) + } + + test("single delete survives netChanges (single-version range FROM 3 TO 3)") { + addDelete(commitVersion = 3L, id = 1L, name = "Alice") + checkAnswer( + runNetChanges(fromV = 3, toV = 3), + Seq(expectDelete(3L, 1L, "Alice"))) } // =========================================================================== // Full matrix: all 9 (first_change_type, last_change_type) cells // =========================================================================== - // - // For each cell, `setup` adds the events that produce a partition with the - // declared first/last change_type pair, and `expected(cu)` returns the rows - // the netChanges output should contain. Some cells need more than two events - // to construct (e.g. `(insert, update_postimage)` requires an insert plus an - // update pre/post pair, i.e. 3 events). - - private case class MatrixCase( - label: String, - setup: () => Unit, - expected: Boolean => Seq[Row]) - - private val matrixCases: Seq[MatrixCase] = Seq( - - // (insert, delete): row inserted and deleted in the range. Cancels. - MatrixCase( - label = "(insert, delete) cancels out", - setup = () => { - addInsert(commitVersion = 2, id = 1L, name = "Alice") - addDelete(commitVersion = 5, id = 1L, name = "Alice") - }, - expected = (_: Boolean) => Seq.empty[Row]), - - // (insert, insert) with cnt > 1: insert + delete + re-insert; net is the - // re-insert with the latest values. Mode-independent (label always insert). - MatrixCase( - label = "(insert, insert) emits the last insert", - setup = () => { - addInsert(commitVersion = 2, id = 1L, name = "Alice") - addDelete(commitVersion = 3, id = 1L, name = "Alice") - addInsert(commitVersion = 5, id = 1L, name = "Alice_v2") - }, - expected = (_: Boolean) => Seq(expectInsert(5L, 1L, "Alice_v2"))), - - // (insert, update_postimage): inserted, then updated. Last event is the - // update post; output is one row labelled `insert` with the post values. - // Mode-independent (label always insert). - MatrixCase( - label = "(insert, update_post) emits last as insert", - setup = () => { - addInsert(commitVersion = 2, id = 1L, name = "Alice") - addUpdatePre(commitVersion = 5, id = 1L, name = "Alice") - addUpdatePost(commitVersion = 5, id = 1L, name = "Alice_v2") - }, - expected = (_: Boolean) => Seq(expectInsert(5L, 1L, "Alice_v2"))), - - // (update_pre, update_post): pure UPDATE(s); 2 rows mode-dependent. - MatrixCase( - label = "(update_pre, update_post) emits PRE + POST", - setup = () => { - addUpdatePre(commitVersion = 3, id = 1L, name = "Alice") - addUpdatePost(commitVersion = 3, id = 1L, name = "Alice_v2") - }, - expected = (cu: Boolean) => - outputForUpdate(cu, 1L, preV = 3, "Alice", postV = 3, "Alice_v2")), - - // (update_pre, insert): pre-existed (via update), exists at end (via re-insert - // after delete). 2 rows mode-dependent. Lifecycle: - // update_pre(2), update_post(2), delete(3), insert(5). - MatrixCase( - label = "(update_pre, insert) emits PRE + POST", - setup = () => { - addUpdatePre(commitVersion = 2, id = 1L, name = "Alice") - addUpdatePost(commitVersion = 2, id = 1L, name = "Alice_v2") - addDelete(commitVersion = 3, id = 1L, name = "Alice_v2") - addInsert(commitVersion = 5, id = 1L, name = "Alice_resurrected") - }, - expected = (cu: Boolean) => - outputForUpdate(cu, 1L, preV = 2, "Alice", postV = 5, "Alice_resurrected")), - - // (delete, update_post): pre-existed (via delete), exists at end (via update - // post after re-insert). 2 rows mode-dependent. Lifecycle: - // delete(2), insert(3), update_pre(5), update_post(5). - MatrixCase( - label = "(delete, update_post) emits PRE + POST", - setup = () => { - addDelete(commitVersion = 2, id = 1L, name = "Alice") - addInsert(commitVersion = 3, id = 1L, name = "Alice_v2") - addUpdatePre(commitVersion = 5, id = 1L, name = "Alice_v2") - addUpdatePost(commitVersion = 5, id = 1L, name = "Alice_v3") - }, - expected = (cu: Boolean) => - outputForUpdate(cu, 1L, preV = 2, "Alice", postV = 5, "Alice_v3")), - - // (delete, insert): pre-existed and exists at end via raw delete + insert - // (no native update labels). 2 rows mode-dependent. - MatrixCase( - label = "(delete, insert) emits PRE + POST", - setup = () => { - addDelete(commitVersion = 2, id = 1L, name = "Alice") - addInsert(commitVersion = 5, id = 1L, name = "Alice_resurrected") - }, - expected = (cu: Boolean) => - outputForUpdate(cu, 1L, preV = 2, "Alice", postV = 5, "Alice_resurrected")), - - // (update_pre, delete): pre-existed (via update), gone at end. 1 row labelled - // `delete` with the values from the FIRST update_pre. Mode-independent (delete). - MatrixCase( - label = "(update_pre, delete) emits first as delete", - setup = () => { - addUpdatePre(commitVersion = 3, id = 1L, name = "Alice") - addUpdatePost(commitVersion = 3, id = 1L, name = "Alice_v2") - addDelete(commitVersion = 5, id = 1L, name = "Alice_v2") - }, - expected = (_: Boolean) => Seq(expectDelete(3L, 1L, "Alice"))), - - // (delete, delete) with cnt > 1: delete + re-insert + delete; net is the - // first delete (the row's state at range entry). Mode-independent (delete). - MatrixCase( - label = "(delete, delete) emits the first delete", - setup = () => { - addDelete(commitVersion = 2, id = 1L, name = "Alice") - addInsert(commitVersion = 3, id = 1L, name = "Alice_v2") - addDelete(commitVersion = 5, id = 1L, name = "Alice_v2") - }, - expected = (_: Boolean) => Seq(expectDelete(2L, 1L, "Alice")))) - - matrixCases.foreach { mc => - Seq(true, false).foreach { cu => - test(s"matrix: ${mc.label} [cu=$cu]") { - mc.setup() - checkAnswer( - runNetChanges(fromV = 1, toV = 10, computeUpdates = cu), - mc.expected(cu)) - } - } + + test("matrix: (insert, delete) cancels out") { + addInsert(commitVersion = 2, id = 1L, name = "Alice") + addDelete(commitVersion = 5, id = 1L, name = "Alice") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq.empty[Row]) + } + + test("matrix: (insert, insert) emits the last insert") { + // Lifecycle: insert(2), delete(3), re-insert(5). + addInsert(commitVersion = 2, id = 1L, name = "Alice") + addDelete(commitVersion = 3, id = 1L, name = "Alice") + addInsert(commitVersion = 5, id = 1L, name = "Alice_v2") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq(expectInsert(5L, 1L, "Alice_v2"))) + } + + test("matrix: (insert, update_post) emits last as insert") { + // Lifecycle: insert(2), update_pre/post(5). + addInsert(commitVersion = 2, id = 1L, name = "Alice") + addUpdatePre(commitVersion = 5, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 5, id = 1L, name = "Alice_v2") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq(expectInsert(5L, 1L, "Alice_v2"))) + } + + test("matrix: (update_pre, update_post) emits PRE + POST") { + // Lifecycle: update_pre/post(3) -- pure UPDATE(s) case. + addUpdatePre(commitVersion = 3, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 3, id = 1L, name = "Alice_v2") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq( + Row(1L, "Alice", computedPreUpdateLabel, 3L), + Row(1L, "Alice_v2", computedPostUpdateLabel, 3L))) + } + + test("matrix: (update_pre, insert) emits PRE + POST") { + // Lifecycle: update_pre/post(2), delete(3), re-insert(5). + addUpdatePre(commitVersion = 2, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 2, id = 1L, name = "Alice_v2") + addDelete(commitVersion = 3, id = 1L, name = "Alice_v2") + addInsert(commitVersion = 5, id = 1L, name = "Alice_resurrected") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq( + Row(1L, "Alice", computedPreUpdateLabel, 2L), + Row(1L, "Alice_resurrected", computedPostUpdateLabel, 5L))) + } + + test("matrix: (delete, update_post) emits PRE + POST") { + // Lifecycle: delete(2), insert(3), update_pre/post(5). + addDelete(commitVersion = 2, id = 1L, name = "Alice") + addInsert(commitVersion = 3, id = 1L, name = "Alice_v2") + addUpdatePre(commitVersion = 5, id = 1L, name = "Alice_v2") + addUpdatePost(commitVersion = 5, id = 1L, name = "Alice_v3") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq( + Row(1L, "Alice", computedPreUpdateLabel, 2L), + Row(1L, "Alice_v3", computedPostUpdateLabel, 5L))) + } + + test("matrix: (delete, insert) emits PRE + POST") { + // Lifecycle: delete(2), re-insert(5) -- raw delete + insert across versions. + addDelete(commitVersion = 2, id = 1L, name = "Alice") + addInsert(commitVersion = 5, id = 1L, name = "Alice_resurrected") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq( + Row(1L, "Alice", computedPreUpdateLabel, 2L), + Row(1L, "Alice_resurrected", computedPostUpdateLabel, 5L))) + } + + test("matrix: (update_pre, delete) emits first as delete") { + // Lifecycle: update_pre/post(3), delete(5). + addUpdatePre(commitVersion = 3, id = 1L, name = "Alice") + addUpdatePost(commitVersion = 3, id = 1L, name = "Alice_v2") + addDelete(commitVersion = 5, id = 1L, name = "Alice_v2") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq(expectDelete(3L, 1L, "Alice"))) + } + + test("matrix: (delete, delete) emits the first delete") { + // Lifecycle: delete(2), insert(3), delete(5). + addDelete(commitVersion = 2, id = 1L, name = "Alice") + addInsert(commitVersion = 3, id = 1L, name = "Alice_v2") + addDelete(commitVersion = 5, id = 1L, name = "Alice_v2") + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq(expectDelete(2L, 1L, "Alice"))) } // =========================================================================== // Range-narrowing: events outside the requested range must not show up // =========================================================================== - // - // Verifies the connector-side range filter and netChanges interact correctly: - // narrow ranges drop events that would otherwise appear, even if those events - // are part of the rowId's lifecycle outside the range. test("range narrowing: only events inside [from, to] reach netChanges") { // Lifecycle: insert(v2) -- update_pre/post(v5) -- delete(v8). The narrow @@ -366,8 +319,84 @@ class ResolveChangelogTableNetChangesSuite addDelete(commitVersion = 8, id = 1L, name = "Alice_v2") checkAnswer( - runNetChanges(fromV = 3, toV = 6, computeUpdates = true), - outputForUpdate( - cu = true, id = 1L, preV = 5, oldName = "Alice", postV = 5, newName = "Alice_v2")) + runNetChanges(fromV = 3, toV = 6), + Seq( + Row(1L, "Alice", computedPreUpdateLabel, 5L), + Row(1L, "Alice_v2", computedPostUpdateLabel, 5L))) + } + + // =========================================================================== + // Multi-rowId: each rowId's lifecycle collapses independently + // =========================================================================== + + test("multi-rowId table lifecycle: each rowId collapses independently") { + // v1: 4 inserts. + addInsert(commitVersion = 1, id = 1L, name = "Alice") + addInsert(commitVersion = 1, id = 2L, name = "Bob") + addInsert(commitVersion = 1, id = 3L, name = "Carol") + addInsert(commitVersion = 1, id = 4L, name = "Dave") + + // v2: update id=3 (emitted as native pre/post pair by the test connector). + addUpdatePre(commitVersion = 2, id = 3L, name = "Carol") + addUpdatePost(commitVersion = 2, id = 3L, name = "Carol_v2") + + // v3: 2 inserts. + addInsert(commitVersion = 3, id = 5L, name = "Eve") + addInsert(commitVersion = 3, id = 6L, name = "Frank") + + // v4: 2 deletes. + addDelete(commitVersion = 4, id = 1L, name = "Alice") + addDelete(commitVersion = 4, id = 2L, name = "Bob") + + checkAnswer( + runNetChanges(fromV = 1, toV = 4), + Seq( + expectInsert(2L, 3L, "Carol_v2"), // id=3: insert + update -> last as insert + expectInsert(1L, 4L, "Dave"), // id=4: lone insert + expectInsert(3L, 5L, "Eve"), // id=5: lone insert + expectInsert(3L, 6L, "Frank"))) // id=6: lone insert } + + test("multi-rowId hitting different mode-dependent cells in one query") { + addDelete(commitVersion = 2, id = 1L, name = "Alice") + addInsert(commitVersion = 5, id = 1L, name = "Alice_resurrected") + + addUpdatePre(commitVersion = 3, id = 2L, name = "Bob") + addUpdatePost(commitVersion = 3, id = 2L, name = "Bob_v2") + + addInsert(commitVersion = 4, id = 3L, name = "Carol") + addDelete(commitVersion = 6, id = 3L, name = "Carol") + + checkAnswer( + runNetChanges(fromV = 1, toV = 10), + Seq( + // id=1: (delete, insert) -- first + last with mode-dependent labels. + Row(1L, "Alice", computedPreUpdateLabel, 2L), + Row(1L, "Alice_resurrected", computedPostUpdateLabel, 5L), + // id=2: (update_pre, update_post) -- PRE + POST with mode-dependent labels. + Row(2L, "Bob", computedPreUpdateLabel, 3L), + Row(2L, "Bob_v2", computedPostUpdateLabel, 3L) + // id=3: (insert, delete) -- cancel, no rows. + )) + } +} + +/** + * Runs the netChanges test bodies with `computeUpdates = true`: + * `existedBefore=true, existsAfter=true` partitions emit `update_preimage` + + * `update_postimage`. + */ +class ResolveChangelogTableNetChangesWithComputeUpdatesSuite + extends ResolveChangelogTableNetChangesTestsBase { + override protected def computeUpdates: Boolean = true +} + +/** + * Runs the netChanges test bodies with `computeUpdates = false`: + * `existedBefore=true, existsAfter=true` partitions emit `delete` + `insert` + * (per SQL Ref Spec footnote). + */ +class ResolveChangelogTableNetChangesWithoutComputeUpdatesSuite + extends ResolveChangelogTableNetChangesTestsBase { + override protected def computeUpdates: Boolean = false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala index 353472a035f91..721aa47beb560 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -587,45 +587,6 @@ class ResolveChangelogTablePostProcessingSuite parameters = Map.empty) } - // =========================================================================== - // Net changes deduplication: not yet supported - // =========================================================================== - // - // `deduplicationMode = netChanges` collapses multiple changes per row identity into the - // net effect. It is not yet implemented in [[ResolveChangelogTable]]. - - test("deduplicationMode=netChanges is rejected when connector emits intermediate changes") { - catalog.setChangelogProperties(ident, ChangelogProperties( - containsIntermediateChanges = true, - rowIdNames = Seq("id"), - rowVersionName = Some("row_commit_version"))) - - checkError( - intercept[AnalysisException] { - sql(s"SELECT * FROM $catalogName.$testTableName " + - s"CHANGES FROM VERSION 1 TO VERSION 2 " + - s"WITH (deduplicationMode = 'netChanges')") - }, - condition = "INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED", - parameters = Map("changelogName" -> s"$catalogName.${testTableName}_changelog")) - } - - test("deduplicationMode=netChanges is rejected even when connector has no intermediate changes") { - catalog.setChangelogProperties(ident, ChangelogProperties( - containsIntermediateChanges = false, - rowIdNames = Seq("id"), - rowVersionName = Some("row_commit_version"))) - - checkError( - intercept[AnalysisException] { - sql(s"SELECT * FROM $catalogName.$testTableName " + - s"CHANGES FROM VERSION 1 TO VERSION 2 " + - s"WITH (deduplicationMode = 'netChanges')") - }, - condition = "INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED", - parameters = Map("changelogName" -> s"$catalogName.${testTableName}_changelog")) - } - // =========================================================================== // Range edge cases // =========================================================================== From 23b2c425582fe3a9d0256ef59fe768ec6dbf88b6 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Wed, 29 Apr 2026 12:47:29 +0000 Subject: [PATCH 3/4] PR feedback --- .../resources/error/error-conditions.json | 10 +-- pom.xml | 30 ++++----- .../analysis/ResolveChangelogTable.scala | 48 ++++++++++---- .../sql/errors/QueryCompilationErrors.scala | 6 -- ...ResolveChangelogTableNetChangesSuite.scala | 66 ++++++++++++++----- 5 files changed, 103 insertions(+), 57 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 87e645ef2b0f0..7822dc05502c0 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -666,6 +666,11 @@ "The Change Data Capture (CDC) connector violated the `Changelog` contract at runtime." ], "subClass" : { + "UNEXPECTED_CHANGE_TYPE" : { + "message" : [ + "Connector emitted a row with a `_change_type` value that is not one of the four supported types (`insert`, `delete`, `update_preimage`, `update_postimage`). The `Changelog` contract requires every emitted row to carry one of these four values." + ] + }, "UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION" : { "message" : [ "Connector emitted multiple delete or insert rows for the same `(rowId, _commit_version)` partition. The `Changelog` contract requires at most one logical change per row identity per commit when `containsIntermediateChanges() = false`. Either fix the connector to deduplicate intermediate states, or set `containsIntermediateChanges() = true` and use `deduplicationMode = netChanges`." @@ -3292,11 +3297,6 @@ "`startingVersion` is required when `endingVersion` is specified for CDC queries." ] }, - "NET_CHANGES_NOT_YET_SUPPORTED" : { - "message" : [ - "The `deduplicationMode = netChanges` option on connector `` is not yet supported. Use `deduplicationMode = dropCarryovers` (default) or `deduplicationMode = none` instead." - ] - }, "STREAMING_POST_PROCESSING_NOT_SUPPORTED" : { "message" : [ "Change Data Capture (CDC) streaming reads on connector `` do not yet support post-processing (carry-over removal, update detection, or net change computation). The requested combination of options would require post-processing, which is currently only available for batch reads. Use a batch read, or set `deduplicationMode = none` and `computeUpdates = false` to receive raw change rows in streaming." diff --git a/pom.xml b/pom.xml index d7dac399c2aed..cdf55dd26033c 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ 17 17.0.11 ${java.version} - 3.9.15 + 3.9.14 3.6.1 spark 9.9.1 @@ -175,9 +175,9 @@ 2.13.18 2.13 2.2.0 - 4.9.10 + 4.9.9 false - 2.21.0 + 2.18.0 true true @@ -186,10 +186,10 @@ 2.21.2 2.3.1 1.1.10.8 - 3.2.0 - 1.22.0 + 3.1.1 + 1.21.0 1.28.0 - 2.22.0 + 2.21.0 2.6 @@ -197,7 +197,7 @@ 2.13.1 4.1.17 - 33.6.0-jre + 33.5.0-jre 1.0.3 2.13.2 3.1.9 @@ -213,11 +213,11 @@ 3.1.0 1.1.0 1.11.0 - 1.84 + 1.83 1.20.0 6.2.0 4.2.12.Final - 2.0.76.Final + 2.0.75.Final 78.3 6.0.3 2.14.6 @@ -309,7 +309,7 @@ 1.1.4 - 4.3-13 + 4.3-12 128m @@ -346,11 +346,11 @@ 13.2.1.jre11 23.26.1.0.0 3.3.1 - 4.1.0 + 4.0.2 20.00.00.39 ${project.version} - 3.5.0 + 3.4.2 @@ -665,7 +665,7 @@ org.glassfish.jaxb jaxb-runtime - 4.0.6 + 4.0.5 compile @@ -681,7 +681,7 @@ jakarta.xml.bind jakarta.xml.bind-api - 4.0.5 + 4.0.2 org.apache.commons @@ -2721,7 +2721,7 @@ org.codehaus.mojo extra-enforcer-rules - 1.12.0 + 1.11.0 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index acbf67a316410..67c2e5af269e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -46,8 +46,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType} * pass over a (rowId, _commit_version)-partitioned Window: the Filter drops CoW * carry-over pairs (same rowVersion on both sides) and the subsequent Project relabels * real delete+insert pairs as update_preimage / update_postimage. Net change - * computation runs on top of that, collapsing intermediate states per `rowId` per - * the SPIP `Deduplication Semantics`. + * computation runs on top of that, collapsing intermediate states per `rowId`. * - Streaming: post-processing is not yet supported. If the requested options would * require any post-processing, the rule throws an explicit [[AnalysisException]] to * prevent silent wrong results. Streams that don't require post-processing pass @@ -90,6 +89,12 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val req = evaluateRequirements(changelog, table.changelogInfo) val resolvedRel = rel.copy(table = table.copy(resolved = true)) + // Resolve rowId once against the bare DataSourceV2Relation. V2ExpressionUtils.resolveRefs + // requires a V2-shaped plan; downstream steps may wrap the relation in + // Project/Window, which would break a re-resolution there. Catalyst preserves these + // resolved attributes by ExprId through any wrapping operators. + val rowIdExprs = + V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq, resolvedRel) var updatedRel: LogicalPlan = resolvedRel if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { updatedRel = addRowLevelPostProcessing( @@ -97,7 +102,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { } if (req.requiresNetChanges) { updatedRel = injectNetChangeComputation( - updatedRel, changelog, table.changelogInfo.computeUpdates()) + updatedRel, rowIdExprs, table.changelogInfo.computeUpdates()) } updatedRel @@ -294,7 +299,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { // --------------------------------------------------------------------------- /** - * Collapses multiple changes per row identity into the net effect: + * Collapses multiple changes per row identity across versions into the net effect: * * | existedBefore | existsAfter | output | * |---------------|-------------|-------------------------------------| @@ -310,14 +315,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { * `update_preimage`. `existsAfter` is true iff the partition's last event is * `insert` or `update_postimage`. * - * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per partition) - * -> Project (relabel `_change_type` and drop helper columns). + * Pipeline: Window (per-rowId aggregates, sort by version) -> Filter (keep first/last per + * partition) -> Project (relabel `_change_type` and drop helper columns). */ private def injectNetChangeComputation( plan: LogicalPlan, - cl: Changelog, + rowIdExprs: Seq[NamedExpression], computeUpdates: Boolean): LogicalPlan = { - val windowedPlan = addNetChangesWindow(plan, cl) + val windowedPlan = addNetChangesWindow(plan, rowIdExprs) val filteredAndRelabeledPlan = removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan, computeUpdates) filteredAndRelabeledPlan @@ -335,16 +340,21 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { * `__spark_cdc_last_row_change_type_value` drive the first/last classification at * filter and relabel time. */ - private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog): LogicalPlan = { + private def addNetChangesWindow( + plan: LogicalPlan, + rowIdExprs: Seq[NamedExpression]): LogicalPlan = { val changeTypeAttr = getAttribute(plan, "_change_type") - val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan) val commitVersionAttr = getAttribute(plan, "_commit_version") + val raiseUnexpectedChangeType = RaiseError( + Literal("CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE"), + CreateMap(Nil), + IntegerType) val changeTypeRank = CaseWhen(Seq( EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)) -> Literal(0), EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) -> Literal(0), EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) -> Literal(1), EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)) -> Literal(1)), - Literal(2)) + raiseUnexpectedChangeType) val partitionByCols = rowIdExprs val orderSpec = Seq( SortOrder(commitVersionAttr, Ascending), @@ -429,10 +439,10 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { // for persisting new row keep only last state And(Not(existedBeforeVersionRange), existsAfterVersionRange) -> isLast, // for previously existing row keep first state - And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst, + And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst), // for persisting row keep first and last state - And(existedBeforeVersionRange, existsAfterVersionRange) -> Or(isFirst, isLast)), - Literal(false)) // dont keep row by default + // existedBeforeVersionRange = true, existsAfterVersionRange = true + Or(isFirst, isLast)) val filteredPlan = Filter(keep, windowedPlan) @@ -445,6 +455,16 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val changeTypeAttr = getAttribute(filteredPlan, "_change_type") + // Each case relabels the kept row(s) to match the required output label. The tuple + // is (first event, last event) of the partition; cases below assume computeUpdates=true. + // Case 1 (insert, update_postimage): keep update_postimage; relabel it to insert. + // Case 2 (update_preimage, delete): keep update_preimage; relabel it to delete. + // Case 3 (delete, update_postimage): keep delete and update_postimage; relabel delete to + // update_preimage. + // Case 4 (update_preimage, insert): keep update_preimage and insert; relabel insert to + // update_postimage. + // No-op cases (e.g. (insert, insert)) are not listed. If computeUpdates=false insert/deletes + // will be used instead of update_pre/postimage. val relabel = CaseWhen(Seq( And(Not(existedBeforeVersionRange), isLast) -> Literal(Changelog.CHANGE_TYPE_INSERT), And(Not(existsAfterVersionRange), isFirst) -> Literal(Changelog.CHANGE_TYPE_DELETE), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index f369317b4b0a5..543711112a709 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3869,12 +3869,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("changelogName" -> changelogName)) } - def cdcNetChangesNotYetSupported(changelogName: String): AnalysisException = { - new AnalysisException( - errorClass = "INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED", - messageParameters = Map("changelogName" -> changelogName)) - } - def cdcStreamingPostProcessingNotSupported(changelogName: String): AnalysisException = { new AnalysisException( errorClass = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala index b4601a89de880..6ed5070e4f54c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala @@ -37,17 +37,22 @@ import org.apache.spark.unsafe.types.UTF8String * Shared test bodies for the `netChanges` deduplication mode handled by * [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]]. * - * Concrete subclasses fix the [[computeUpdates]] flag and therefore run the - * entire suite twice (once with `computeUpdates = true`, once with - * `computeUpdates = false`). Test bodies use [[computedPreUpdateLabel]] / - * [[computedPostUpdateLabel]] in their expected outputs. + * Concrete subclasses fix the [[computeUpdates]] and + * [[representsUpdateAsDeleteAndInsert]] flags. Test bodies use + * [[computedPreUpdateLabel]] / [[computedPostUpdateLabel]] in their expected outputs. * * Setup convention: every test runs against an in-memory connector configured * with `containsIntermediateChanges = true` and - * `representsUpdateAsDeleteAndInsert = false`, which means - * - netChanges is enabled (the only post-processing pass under test); - * - update detection is disabled (so the test directly controls the - * change_type labels reaching netChanges). + * `containsCarryoverRows = false`, which means + * - netChanges is enabled; + * - carry-over removal is disabled (so the test directly controls events). + * + * When `representsUpdateAsDeleteAndInsert = true` AND `computeUpdates = true`, update + * detection runs upstream of netChanges, exercising the chained pipeline. The + * `addUpdatePre` / `addUpdatePost` helpers then emit raw `delete` / `insert` events + * (decomposed updates) which update detection relabels back to update pre/post before + * netChanges sees them. Output assertions stay identical because both paths produce + * the same `_change_type` labels at the netChanges input. */ trait ResolveChangelogTableNetChangesTestsBase extends QueryTest @@ -56,11 +61,18 @@ trait ResolveChangelogTableNetChangesTestsBase /** * Value of the user-facing CDC option `computeUpdates` that this test run - * exercises. Concrete subclasses pin this to `true` or `false` so the same - * test bodies cover both modes via two suite classes. + * exercises. Concrete subclasses pin this to `true` or `false`. */ protected def computeUpdates: Boolean + /** + * Value of the connector capability `representsUpdateAsDeleteAndInsert`. When `true` + * (combined with `computeUpdates = true`), update detection runs upstream of netChanges + * and the test helpers `addUpdatePre` / `addUpdatePost` emit decomposed `delete` / + * `insert` events instead of native pre/post events. + */ + protected def representsUpdateAsDeleteAndInsert: Boolean = false + private val catalogName = "cdc_netchanges_catalog" private val testTableName = "events" @@ -87,7 +99,7 @@ trait ResolveChangelogTableNetChangesTestsBase cat.setChangelogProperties(ident, ChangelogProperties( containsIntermediateChanges = true, containsCarryoverRows = false, - representsUpdateAsDeleteAndInsert = false, + representsUpdateAsDeleteAndInsert = representsUpdateAsDeleteAndInsert, rowIdNames = Seq("id"), rowVersionName = Some("row_commit_version"))) } @@ -126,13 +138,19 @@ trait ResolveChangelogTableNetChangesTestsBase private def addDelete(commitVersion: Long, id: Long, name: String): Unit = catalog.addChangeRows(ident, Seq(cdcEntry(id, name, CHANGE_TYPE_DELETE, commitVersion))) - private def addUpdatePre(commitVersion: Long, id: Long, name: String): Unit = - catalog.addChangeRows( - ident, Seq(cdcEntry(id, name, CHANGE_TYPE_UPDATE_PREIMAGE, commitVersion))) + private def addUpdatePre(commitVersion: Long, id: Long, name: String): Unit = { + val changeType = + if (representsUpdateAsDeleteAndInsert) CHANGE_TYPE_DELETE + else CHANGE_TYPE_UPDATE_PREIMAGE + catalog.addChangeRows(ident, Seq(cdcEntry(id, name, changeType, commitVersion))) + } - private def addUpdatePost(commitVersion: Long, id: Long, name: String): Unit = - catalog.addChangeRows( - ident, Seq(cdcEntry(id, name, CHANGE_TYPE_UPDATE_POSTIMAGE, commitVersion))) + private def addUpdatePost(commitVersion: Long, id: Long, name: String): Unit = { + val changeType = + if (representsUpdateAsDeleteAndInsert) CHANGE_TYPE_INSERT + else CHANGE_TYPE_UPDATE_POSTIMAGE + catalog.addChangeRows(ident, Seq(cdcEntry(id, name, changeType, commitVersion))) + } // --------------------------------------------------------------------------- // Expected output helpers @@ -400,3 +418,17 @@ class ResolveChangelogTableNetChangesWithoutComputeUpdatesSuite extends ResolveChangelogTableNetChangesTestsBase { override protected def computeUpdates: Boolean = false } + +/** + * Runs the netChanges test bodies against a connector with + * `representsUpdateAsDeleteAndInsert = true` and `computeUpdates = true`. Update + * detection runs upstream of netChanges, exercising the chained post-processing + * pipeline end-to-end. The `addUpdatePre` / `addUpdatePost` helpers emit decomposed + * `delete` / `insert` events that update detection relabels back to update pre/post + * before netChanges sees them, so the same expected outputs hold. + */ +class ResolveChangelogTableNetChangesWithUpdateDetectionSuite + extends ResolveChangelogTableNetChangesTestsBase { + override protected def computeUpdates: Boolean = true + override protected def representsUpdateAsDeleteAndInsert: Boolean = true +} From 312495b8f83187ad1501ebd7566a810802fb00ce Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 29 Apr 2026 21:27:06 +0000 Subject: [PATCH 4/4] Address review: gate rowId resolution; revert unrelated pom.xml drift - ResolveChangelogTable: move the hoisted V2 rowId resolution inside the `if (req.requiresNetChanges)` branch. The hoist was needed to resolve against the bare DataSourceV2Relation (V2ExpressionUtils.resolveRefs doesn't work on the wrapped Project/Window plan), but it should not run unconditionally: connectors that report all of containsCarryoverRows / containsIntermediateChanges / representsUpdateAsDeleteAndInsert as false are allowed by the Changelog contract to inherit the default rowId() impl, which throws. Gating preserves the previous "only call when needed" behavior while keeping the V2-resolution fix. - pom.xml: revert to upstream/master. The diff was stale-rebase noise (maven 3.9.15->3.9.14, scala-maven-plugin 4.9.10->4.9.9, commons-codec 1.22.0->1.21.0, guava 33.6.0->33.5.0, etc.), unrelated to this PR. --- pom.xml | 30 +++++++++---------- .../analysis/ResolveChangelogTable.scala | 13 ++++---- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pom.xml b/pom.xml index cdf55dd26033c..d7dac399c2aed 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ 17 17.0.11 ${java.version} - 3.9.14 + 3.9.15 3.6.1 spark 9.9.1 @@ -175,9 +175,9 @@ 2.13.18 2.13 2.2.0 - 4.9.9 + 4.9.10 false - 2.18.0 + 2.21.0 true true @@ -186,10 +186,10 @@ 2.21.2 2.3.1 1.1.10.8 - 3.1.1 - 1.21.0 + 3.2.0 + 1.22.0 1.28.0 - 2.21.0 + 2.22.0 2.6 @@ -197,7 +197,7 @@ 2.13.1 4.1.17 - 33.5.0-jre + 33.6.0-jre 1.0.3 2.13.2 3.1.9 @@ -213,11 +213,11 @@ 3.1.0 1.1.0 1.11.0 - 1.83 + 1.84 1.20.0 6.2.0 4.2.12.Final - 2.0.75.Final + 2.0.76.Final 78.3 6.0.3 2.14.6 @@ -309,7 +309,7 @@ 1.1.4 - 4.3-12 + 4.3-13 128m @@ -346,11 +346,11 @@ 13.2.1.jre11 23.26.1.0.0 3.3.1 - 4.0.2 + 4.1.0 20.00.00.39 ${project.version} - 3.4.2 + 3.5.0 @@ -665,7 +665,7 @@ org.glassfish.jaxb jaxb-runtime - 4.0.5 + 4.0.6 compile @@ -681,7 +681,7 @@ jakarta.xml.bind jakarta.xml.bind-api - 4.0.2 + 4.0.5 org.apache.commons @@ -2721,7 +2721,7 @@ org.codehaus.mojo extra-enforcer-rules - 1.11.0 + 1.12.0 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index 67c2e5af269e3..665a229f00c34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -89,18 +89,19 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val req = evaluateRequirements(changelog, table.changelogInfo) val resolvedRel = rel.copy(table = table.copy(resolved = true)) - // Resolve rowId once against the bare DataSourceV2Relation. V2ExpressionUtils.resolveRefs - // requires a V2-shaped plan; downstream steps may wrap the relation in - // Project/Window, which would break a re-resolution there. Catalyst preserves these - // resolved attributes by ExprId through any wrapping operators. - val rowIdExprs = - V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq, resolvedRel) var updatedRel: LogicalPlan = resolvedRel if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { updatedRel = addRowLevelPostProcessing( resolvedRel, changelog, req.requiresCarryOverRemoval, req.requiresUpdateDetection) } if (req.requiresNetChanges) { + // Resolve rowId against the bare DataSourceV2Relation. V2ExpressionUtils.resolveRefs + // requires a V2-shaped plan; addRowLevelPostProcessing may have wrapped the relation + // in Project/Window, which would break resolution against `updatedRel`. Catalyst + // preserves these resolved attributes by ExprId through any wrapping operators, so + // they remain valid references for the netChanges Window built on top. + val rowIdExprs = + V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq, resolvedRel) updatedRel = injectNetChangeComputation( updatedRel, rowIdExprs, table.changelogInfo.computeUpdates()) }