diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 5db07c4cea527..509cacf26455d 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2753,6 +2753,34 @@ ], "sqlState" : "22P03" }, + "INVALID_CDC_OPTION" : { + "message" : [ + "Invalid Change Data Capture (CDC) option." + ], + "sqlState" : "42K03", + "subClass" : { + "CONFLICTING_RANGE_TYPES" : { + "message" : [ + "Cannot specify both version and timestamp ranges for CDC queries. Use either startingVersion/endingVersion or startingTimestamp/endingTimestamp." + ] + }, + "INVALID_DEDUPLICATION_MODE" : { + "message" : [ + "Invalid deduplicationMode: ''. Expected one of: none, dropCarryovers, netChanges." + ] + }, + "MISSING_STARTING_TIMESTAMP" : { + "message" : [ + "startingTimestamp is required when endingTimestamp is specified for CDC queries." + ] + }, + "MISSING_STARTING_VERSION" : { + "message" : [ + "startingVersion is required when endingVersion is specified for CDC queries." + ] + } + } + }, "INVALID_CLONE_SESSION_REQUEST" : { "message" : [ "Invalid session clone request." @@ -7013,6 +7041,16 @@ "Catalog does not support ." ] }, + "CHANGE_DATA_CAPTURE" : { + "message" : [ + "Catalog does not support Change Data Capture (CDC). The catalog must declare the SUPPORT_CHANGELOG capability." + ] + }, + "CHANGE_DATA_CAPTURE_ON_RELATION" : { + "message" : [ + "Change Data Capture (CDC) on the relation: ." + ] + }, "CLAUSE_WITH_PIPE_OPERATORS" : { "message" : [ "The SQL pipe operator syntax using |> does not support ." diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java new file mode 100644 index 0000000000000..1cedc544e4c94 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * The central connector interface for Change Data Capture (CDC). + *

+ * Connectors implement this minimal interface to expose change data. Spark handles + * post-processing (carry-over removal, update detection, net change computation) based on + * the properties declared by the connector. + *

+ * The columns returned by {@link #columns()} must include the following metadata columns: + *

    + *
  • {@code _change_type} (STRING) — the kind of change: {@code insert}, {@code delete}, + * {@code update_preimage}, or {@code update_postimage}
  • + *
  • {@code _commit_version} (connector-defined type, e.g. LONG) — the version containing + * this change
  • + *
  • {@code _commit_timestamp} (TIMESTAMP) — the timestamp of the commit
  • + *
+ * + * @since 4.2.0 + */ +@Evolving +public interface Changelog { + + /** A name to identify this changelog. */ + String name(); + + /** + * Returns the columns of this changelog, including data columns and the required + * metadata columns ({@code _change_type}, {@code _commit_version}, + * {@code _commit_timestamp}). + */ + Column[] columns(); + + /** + * Whether the change data may contain identical insert/delete carry-over pairs + * produced by copy-on-write file rewrites. + *

+ * If {@code false}, the connector guarantees that no carry-over pairs are present and + * Spark will skip carry-over removal entirely. + */ + boolean containsCarryoverRows(); + + /** + * Whether the change data may contain multiple intermediate states per row identity + * within a single commit version. + *

+ * If {@code false}, the connector guarantees at most one change per row identity per + * commit version, and Spark will skip net change computation. + */ + boolean containsIntermediateChanges(); + + /** + * Whether updates are represented as delete+insert pairs rather than fully + * materialized {@code update_preimage} and {@code update_postimage} entries. + *

+ * If {@code false}, the connector guarantees that update pre/post-images are already + * present in the change data. Spark will not attempt to derive updates from + * insert/delete pairs. + */ + boolean representsUpdateAsDeleteAndInsert(); + + /** + * Returns a new {@link ScanBuilder} for reading the change data. + * + * @param options read options (case-insensitive string map) + */ + ScanBuilder newScanBuilder(CaseInsensitiveStringMap options); + + /** + * Returns the columns that uniquely identify a row, used for update detection and + * net change computation. + *

+ * The default implementation returns an empty array, which means row identity is not + * available and update detection / net changes cannot be performed. + */ + default NamedReference[] rowId() { + return new NamedReference[0]; + } + + /** + * Returns the column used for ordering changes within the same row identity, used for + * update detection. + *

+ * The default implementation returns {@code null}, which means no ordering column is + * available. + */ + default NamedReference rowVersion() { + return null; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java new file mode 100644 index 0000000000000..0e5284c9a3e9b --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogInfo.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; + +/** + * Encapsulates the parameters of a Change Data Capture (CDC) query, passed from the + * parser / DataFrame API to the catalog's + * {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)} method. + * + * @since 4.2.0 + */ +@Evolving +public class ChangelogInfo { + + /** + * Deduplication modes controlling how Spark post-processes raw change data. + */ + public enum DeduplicationMode { + /** Raw change rows as-is from the connector — no post-processing. */ + NONE, + /** Remove identical insert/delete pairs from copy-on-write file rewrites (default). */ + DROP_CARRYOVERS, + /** Collapse to one net change per row identity. */ + NET_CHANGES + } + + private final ChangelogRange range; + private final DeduplicationMode deduplicationMode; + private final boolean computeUpdates; + + public ChangelogInfo( + ChangelogRange range, + DeduplicationMode deduplicationMode, + boolean computeUpdates) { + this.range = range; + this.deduplicationMode = deduplicationMode; + this.computeUpdates = computeUpdates; + } + + /** Returns the version/timestamp range for this CDC query. */ + public ChangelogRange range() { return range; } + + /** Returns the deduplication mode for this CDC query. */ + public DeduplicationMode deduplicationMode() { return deduplicationMode; } + + /** Whether to derive update_preimage/update_postimage from insert/delete pairs. */ + public boolean computeUpdates() { return computeUpdates; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ChangelogInfo that)) return false; + return computeUpdates == that.computeUpdates + && Objects.equals(range, that.range) + && deduplicationMode == that.deduplicationMode; + } + + @Override + public int hashCode() { + return Objects.hash(range, deduplicationMode, computeUpdates); + } + + @Override + public String toString() { + return "ChangelogInfo{range=" + range + + ", deduplicationMode=" + deduplicationMode + + ", computeUpdates=" + computeUpdates + "}"; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogRange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogRange.java new file mode 100644 index 0000000000000..8ecf03d916ac2 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ChangelogRange.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Optional; + +import org.apache.spark.annotation.Evolving; + +/** + * Represents the version or timestamp range for a Change Data Capture (CDC) query. + *

+ * This sealed interface has three implementations: + *

    + *
  • {@link VersionRange} — range defined by version identifiers
  • + *
  • {@link TimestampRange} — range defined by timestamps
  • + *
  • {@link Unbounded} — no boundaries (used by streaming queries)
  • + *
+ * + * @since 4.2.0 + */ +@Evolving +public sealed interface ChangelogRange + permits ChangelogRange.VersionRange, ChangelogRange.TimestampRange, ChangelogRange.Unbounded { + + /** Whether the starting bound is inclusive. */ + boolean startingBoundInclusive(); + + /** Whether the ending bound is inclusive. */ + boolean endingBoundInclusive(); + + /** + * A changelog range defined by version identifiers. + * + * @param startingVersion the starting version (always present) + * @param endingVersion the ending version (empty means latest) + * @param startingBoundInclusive whether the starting bound is inclusive + * @param endingBoundInclusive whether the ending bound is inclusive + */ + record VersionRange( + String startingVersion, + Optional endingVersion, + boolean startingBoundInclusive, + boolean endingBoundInclusive) implements ChangelogRange {} + + /** + * A changelog range defined by timestamps. + * + * @param startingTimestamp the starting timestamp in microseconds since epoch + * @param endingTimestamp the ending timestamp in microseconds since epoch (empty means latest) + * @param startingBoundInclusive whether the starting bound is inclusive + * @param endingBoundInclusive whether the ending bound is inclusive + */ + record TimestampRange( + long startingTimestamp, + Optional endingTimestamp, + boolean startingBoundInclusive, + boolean endingBoundInclusive) implements ChangelogRange {} + + /** + * An unbounded changelog range with no starting or ending boundaries. + * Used by streaming queries where the connector determines the starting point. + */ + record Unbounded() implements ChangelogRange { + @Override public boolean startingBoundInclusive() { return true; } + @Override public boolean endingBoundInclusive() { return true; } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 832f437ab8354..b095f1fcc4624 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -199,6 +199,27 @@ default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExce throw QueryCompilationErrors.noSuchTableError(name(), ident); } + /** + * Load a {@link Changelog} for the given table, representing the row-level changes within the + * range specified by {@code changelogInfo}. + *

+ * This method is only called when the catalog declares + * {@link TableCatalogCapability#SUPPORT_CHANGELOG} in its {@link #capabilities()} set. + * + * @param ident a table identifier + * @param changelogInfo the CDC query parameters (range, deduplication mode, etc.) + * @return a Changelog instance for the requested table and range + * @throws NoSuchTableException If the table doesn't exist + * @throws UnsupportedOperationException If the catalog does not support CDC + * + * @since 4.2.0 + */ + default Changelog loadChangelog(Identifier ident, ChangelogInfo changelogInfo) + throws NoSuchTableException { + throw new UnsupportedOperationException( + name() + " does not support Change Data Capture (CDC)"); + } + /** * Invalidate cached table metadata for an {@link Identifier identifier}. *

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java index a60c827d5ace1..16d7e9d88bf46 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -92,5 +92,17 @@ public enum TableCatalogCapability { * {@link TableCatalog#createTable}. * See {@link Column#identityColumnSpec()}. */ - SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS + SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS, + + /** + * Signals that the TableCatalog supports Change Data Capture (CDC) queries via + * {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)}. + *

+ * Without this capability, any CDC query (SQL {@code CHANGES} clause or + * {@code DataFrameReader.changes()}) targeting this catalog will throw an + * analysis exception during resolution. + * + * @since 4.2.0 + */ + SUPPORT_CHANGELOG } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 12fc0f0a09fa6..add7179c48ab7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1067,6 +1067,9 @@ class Analyzer( val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone) resolveRelation(u, timeTravelSpec).getOrElse(r) + case r @ RelationChanges(u: UnresolvedRelation, changelogInfo) => + relationResolution.resolveChangelog(u, changelogInfo).getOrElse(r) + case u @ UnresolvedTable(identifier, cmd, suggestAlternative) => lookupTableOrView(identifier).map { case v: ResolvedPersistentView => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 0b6f8ec87417f..1623fa98dad6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -412,12 +412,19 @@ object CTESubstitution extends Rule[LogicalPlan] { cteRelations: Seq[(String, CTERelationDef)], recursiveCTERelation: Option[(String, CTERelationDef)]): LogicalPlan = { plan.resolveOperatorsUpWithPruning( - _.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION, - UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) { + _.containsAnyPattern(RELATION_TIME_TRAVEL, RELATION_CHANGES, UNRESOLVED_RELATION, + PLAN_EXPRESSION, UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) { case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _) if cteRelations.exists(r => plan.conf.resolver(r._1, table)) => throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(table)) + // CDC CHANGES requires a real catalog table to load a Changelog from. A CTE is an + // inline subquery definition with no versioned history, so it cannot be a CDC source. + // This mirrors the time travel restriction above. + case RelationChanges(UnresolvedRelation(Seq(table), _, _), _) + if cteRelations.exists(r => plan.conf.resolver(r._1, table)) => + throw QueryCompilationErrors.cdcUnsupportedOnRelationError(toSQLId(table)) + case u @ UnresolvedRelation(Seq(table), _, _) => resolveWithCTERelations(table, alwaysInline, cteRelations, recursiveCTERelation, u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala new file mode 100644 index 0000000000000..4115bbae57f78 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtils.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.util.Optional + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange} +import org.apache.spark.sql.types.TimestampType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Utility methods for constructing [[ChangelogInfo]] from DataFrame API options. + */ +object ChangelogInfoUtils { + + /** + * Build a [[ChangelogInfo]] from the options specified via `.option()` calls on + * `DataFrameReader` or `DataStreamReader`. + */ + def fromOptions( + options: CaseInsensitiveStringMap, + sessionLocalTimeZone: String): ChangelogInfo = { + val startVersion = Option(options.get("startingVersion")) + val endVersion = Option(options.get("endingVersion")) + val startTimestamp = Option(options.get("startingTimestamp")) + val endTimestamp = Option(options.get("endingTimestamp")) + + val startInclusive = options.getBoolean("startingBoundInclusive", true) + val endInclusive = options.getBoolean("endingBoundInclusive", true) + + val deduplicationModeStr = Option(options.get("deduplicationMode")) + .getOrElse("dropCarryovers").toLowerCase(java.util.Locale.ROOT) + val deduplicationMode = deduplicationModeStr match { + case "none" => ChangelogInfo.DeduplicationMode.NONE + case "dropcarryovers" => ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS + case "netchanges" => ChangelogInfo.DeduplicationMode.NET_CHANGES + case other => + throw new AnalysisException( + "INVALID_CDC_OPTION.INVALID_DEDUPLICATION_MODE", + Map("mode" -> other)) + } + val computeUpdates = options.getBoolean("computeUpdates", false) + + // Determine range from options + val hasVersionRange = startVersion.isDefined || endVersion.isDefined + val hasTimestampRange = startTimestamp.isDefined || endTimestamp.isDefined + + if (hasVersionRange && hasTimestampRange) { + throw new AnalysisException( + "INVALID_CDC_OPTION.CONFLICTING_RANGE_TYPES", + Map.empty[String, String]) + } + + val range = if (hasVersionRange) { + val sv = startVersion.getOrElse( + throw new AnalysisException( + "INVALID_CDC_OPTION.MISSING_STARTING_VERSION", + Map.empty[String, String])) + new ChangelogRange.VersionRange( + sv, + endVersion.map(Optional.of[String]).getOrElse(Optional.empty[String]), + startInclusive, + endInclusive) + } else if (hasTimestampRange) { + val startTsValue = startTimestamp.map(parseTimestamp(_, sessionLocalTimeZone)).getOrElse( + throw new AnalysisException( + "INVALID_CDC_OPTION.MISSING_STARTING_TIMESTAMP", + Map.empty[String, String])) + val endTsValue = endTimestamp.map(ts => + java.lang.Long.valueOf(parseTimestamp(ts, sessionLocalTimeZone))) + new ChangelogRange.TimestampRange( + startTsValue, + endTsValue.map(Optional.of[java.lang.Long]).getOrElse(Optional.empty[java.lang.Long]), + startInclusive, + endInclusive) + } else { + // No range specified — unbounded (streaming use case) + new ChangelogRange.Unbounded() + } + + new ChangelogInfo(range, deduplicationMode, computeUpdates) + } + + private def parseTimestamp(timestampStr: String, sessionLocalTimeZone: String): Long = { + val value = Cast( + Literal(timestampStr), + TimestampType, + Some(sessionLocalTimeZone), + ansiEnabled = false + ).eval() + if (value == null) { + throw new AnalysisException( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION", + Map("expr" -> s"'$timestampStr'")) + } + value.asInstanceOf[Long] + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d6613cebb2202..94c40215ab59f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -495,6 +495,9 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString case RelationTimeTravel(u: UnresolvedRelation, _, _) => u.tableNotFound(u.multipartIdentifier) + case RelationChanges(u: UnresolvedRelation, _) => + u.tableNotFound(u.multipartIdentifier) + case etw: EventTimeWatermark => etw.eventTime.dataType match { case s: StructType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala new file mode 100644 index 0000000000000..43f948c85348e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationChanges.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreePattern.{RELATION_CHANGES, TreePattern} +import org.apache.spark.sql.connector.catalog.ChangelogInfo + +/** + * A logical node used to query Change Data Capture (CDC) changes for a table relation. + * + * This is an unresolved node created by the parser when it encounters a `CHANGES` clause, + * or by the DataFrame API when `DataFrameReader.changes()` / `DataStreamReader.changes()` is + * called. During analysis, it is resolved by loading a `Changelog` from the catalog and wrapping + * it in a `ChangelogTable`. + * + * @param relation the underlying table relation (typically an [[UnresolvedRelation]]) + * @param changelogInfo the CDC query parameters (range, deduplication mode, etc.) + */ +case class RelationChanges( + relation: LogicalPlan, + changelogInfo: ChangelogInfo) extends UnresolvedLeafNode { + override val nodePatterns: Seq[TreePattern] = Seq(RELATION_CHANGES) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 6fe386c3c6772..d8fb998c6a3ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -33,15 +33,17 @@ import org.apache.spark.sql.connector.catalog.{ CatalogManager, CatalogPlugin, CatalogV2Util, + ChangelogInfo, Identifier, LookupCatalog, Table, + TableCatalogCapability, V1Table, V2TableWithV1Fallback } import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ @@ -159,6 +161,43 @@ class RelationResolution( } } + /** + * Resolve a CDC (CHANGES) query: look up the catalog, check for SUPPORT_CHANGELOG capability, + * call loadChangelog(), wrap in ChangelogTable, and return a DataSourceV2Relation. + */ + def resolveChangelog( + u: UnresolvedRelation, + changelogInfo: ChangelogInfo): Option[LogicalPlan] = { + expandIdentifier(u.multipartIdentifier) match { + case CatalogAndIdentifier(catalog, ident) => + val tableCatalog = catalog.asTableCatalog + if (!tableCatalog.capabilities().contains(TableCatalogCapability.SUPPORT_CHANGELOG)) { + throw new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE", + messageParameters = Map("catalogName" -> tableCatalog.name())) + } + val changelog = tableCatalog.loadChangelog(ident, changelogInfo) + val changelogTable = new ChangelogTable(changelog, changelogInfo) + val relation = if (u.isStreaming) { + StreamingRelationV2( + None, + changelogTable.name(), + changelogTable, + u.options, + changelogTable.columns.toAttributes, + Some(catalog), + Some(ident), + None + ) + } else { + DataSourceV2Relation.create( + changelogTable, Some(catalog), Some(ident), u.options) + } + Some(SubqueryAlias(catalog.name +: ident.asMultipartIdentifier, relation)) + case _ => None + } + } + private def lookupSharedRelationCache( catalog: CatalogPlugin, ident: Identifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala index 977b1624271e8..d2fe2ad602c12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala @@ -36,6 +36,42 @@ case class AsOfVersion(version: String) extends TimeTravelSpec { } object TimeTravelSpec { + + /** + * Evaluate a resolved timestamp expression to microseconds since epoch. + * Shared by time travel and CDC timestamp resolution. + */ + def resolveTimestampExpression(ts: Expression, sessionLocalTimeZone: String): Long = { + assert(ts.resolved && ts.references.isEmpty) + if (!Cast.canAnsiCast(ts.dataType, TimestampType)) { + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) + } + val tsToEval = { + val fakeProject = Project(Seq(Alias(ts, "ts")()), OneRowRelation()) + ComputeCurrentTime(ReplaceExpressions(fakeProject)).asInstanceOf[Project] + .expressions.head.asInstanceOf[Alias].child + } + tsToEval.foreach { + case _: Unevaluable => + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.UNEVALUABLE", ts) + case e if !e.deterministic => + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts) + case _ => + } + val tz = Some(sessionLocalTimeZone) + // Set `ansiEnabled` to false, so that it can return null for invalid input and we can provide + // better error message. + val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval() + if (value == null) { + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) + } + value.asInstanceOf[Long] + } + def create( timestamp: Option[Expression], version: Option[String], @@ -44,34 +80,8 @@ object TimeTravelSpec { throw QueryCompilationErrors.invalidTimeTravelSpecError() } else if (timestamp.nonEmpty) { val ts = timestamp.get - assert(ts.resolved && ts.references.isEmpty && !SubqueryExpression.hasSubquery(ts)) - if (!Cast.canAnsiCast(ts.dataType, TimestampType)) { - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) - } - val tsToEval = { - val fakeProject = Project(Seq(Alias(ts, "ts")()), OneRowRelation()) - ComputeCurrentTime(ReplaceExpressions(fakeProject)).asInstanceOf[Project] - .expressions.head.asInstanceOf[Alias].child - } - tsToEval.foreach { - case _: Unevaluable => - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.UNEVALUABLE", ts) - case e if !e.deterministic => - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts) - case _ => - } - val tz = Some(sessionLocalTimeZone) - // Set `ansiEnabled` to false, so that it can return null for invalid input and we can provide - // better error message. - val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval() - if (value == null) { - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) - } - Some(AsOfTimestamp(value.asInstanceOf[Long])) + assert(!SubqueryExpression.hasSubquery(ts)) + Some(AsOfTimestamp(resolveTimestampExpression(ts, sessionLocalTimeZone))) } else if (version.nonEmpty) { Some(AsOfVersion(version.get)) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 882c05594c054..1e22c1ce86539 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -163,6 +163,7 @@ object TreePattern extends Enumeration { val PIVOT: Value = Value val PROJECT: Value = Value val PYTHON_DATA_SOURCE: Value = Value + val RELATION_CHANGES: Value = Value val RELATION_TIME_TRAVEL: Value = Value val REPARTITION_OPERATION: Value = Value val REBALANCE_PARTITIONS: Value = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index a784276200c0d..0fa2b9008a2b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3729,6 +3729,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("relationId" -> relationId)) } + def cdcUnsupportedOnRelationError(relationId: String): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE_ON_RELATION", + messageParameters = Map("relationId" -> relationId)) + } + def writeDistributionAndOrderingNotSupportedInContinuousExecution(): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1338", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala new file mode 100644 index 0000000000000..298cca60f95d9 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.{Collections, Set => JSet} + +import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Column, SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * An internal wrapper that adapts a connector's [[Changelog]] into a DSv2 [[Table]] with + * [[SupportsRead]], enabling reuse of [[DataSourceV2Relation]] without logical plan changes. + * + * This class is NOT part of the connector API. Connectors implement [[Changelog]]; Spark + * wraps it in [[ChangelogTable]] during analysis. + */ +class ChangelogTable( + val changelog: Changelog, + val changelogInfo: ChangelogInfo) extends Table with SupportsRead { + + override def name(): String = changelog.name() + + override def columns(): Array[Column] = changelog.columns() + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + changelog.newScanBuilder(options) + } + + override def capabilities(): JSet[TableCapability] = { + Collections.unmodifiableSet( + JSet.of(TableCapability.BATCH_READ, TableCapability.MICRO_BATCH_READ)) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala new file mode 100644 index 0000000000000..f0d164a2a3022 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ChangelogInfoUtilsSuite.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.connector.catalog.{ChangelogInfo, ChangelogRange} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ChangelogInfoUtilsSuite extends SparkFunSuite with SQLHelper { + + private val testTimeZone = "UTC" + + private def makeOptions(kvs: (String, String)*): CaseInsensitiveStringMap = { + new CaseInsensitiveStringMap(kvs.toMap.asJava) + } + + test("version range with both start and end") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("startingVersion" -> "1", "endingVersion" -> "5"), testTimeZone) + val range = info.range().asInstanceOf[ChangelogRange.VersionRange] + assert(range.startingVersion() == "1") + assert(range.endingVersion().get() == "5") + assert(range.startingBoundInclusive()) + assert(range.endingBoundInclusive()) + } + + test("version range with only start") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("startingVersion" -> "10"), testTimeZone) + val range = info.range().asInstanceOf[ChangelogRange.VersionRange] + assert(range.startingVersion() == "10") + assert(!range.endingVersion().isPresent) + } + + test("version range - endingVersion without startingVersion throws") { + checkError( + intercept[AnalysisException] { + ChangelogInfoUtils.fromOptions( + makeOptions("endingVersion" -> "5"), testTimeZone) + }, + condition = "INVALID_CDC_OPTION.MISSING_STARTING_VERSION") + } + + test("timestamp range with both start and end") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("startingTimestamp" -> "2026-01-01", "endingTimestamp" -> "2026-02-01"), + testTimeZone) + val range = info.range().asInstanceOf[ChangelogRange.TimestampRange] + assert(range.endingTimestamp().isPresent) + assert(range.startingBoundInclusive()) + assert(range.endingBoundInclusive()) + } + + test("timestamp range with only start") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("startingTimestamp" -> "2026-01-01"), testTimeZone) + val range = info.range().asInstanceOf[ChangelogRange.TimestampRange] + assert(!range.endingTimestamp().isPresent) + } + + test("timestamp range - endingTimestamp without startingTimestamp throws") { + checkError( + intercept[AnalysisException] { + ChangelogInfoUtils.fromOptions( + makeOptions("endingTimestamp" -> "2026-02-01"), testTimeZone) + }, + condition = "INVALID_CDC_OPTION.MISSING_STARTING_TIMESTAMP") + } + + test("cannot mix version and timestamp range") { + checkError( + intercept[AnalysisException] { + ChangelogInfoUtils.fromOptions( + makeOptions("startingVersion" -> "1", "startingTimestamp" -> "2026-01-01"), + testTimeZone) + }, + condition = "INVALID_CDC_OPTION.CONFLICTING_RANGE_TYPES") + } + + test("unbounded range when no version or timestamp specified") { + val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone) + assert(info.range().isInstanceOf[ChangelogRange.Unbounded]) + } + + test("deduplication mode - none") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("deduplicationMode" -> "none"), testTimeZone) + assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) + } + + test("deduplication mode - dropCarryovers (default)") { + val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone) + assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + } + + test("deduplication mode - netChanges") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("deduplicationMode" -> "netChanges"), testTimeZone) + assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES) + } + + test("deduplication mode - case insensitive") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("deduplicationMode" -> "DROPCARRYOVERS"), testTimeZone) + assert(info.deduplicationMode() == ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS) + } + + test("deduplication mode - invalid value throws") { + checkError( + intercept[AnalysisException] { + ChangelogInfoUtils.fromOptions( + makeOptions("deduplicationMode" -> "invalid"), testTimeZone) + }, + condition = "INVALID_CDC_OPTION.INVALID_DEDUPLICATION_MODE", + parameters = Map("mode" -> "invalid")) + } + + test("computeUpdates option") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions("computeUpdates" -> "true"), testTimeZone) + assert(info.computeUpdates()) + } + + test("computeUpdates defaults to false") { + val info = ChangelogInfoUtils.fromOptions(makeOptions(), testTimeZone) + assert(!info.computeUpdates()) + } + + test("bound inclusivity options") { + val info = ChangelogInfoUtils.fromOptions( + makeOptions( + "startingVersion" -> "1", + "endingVersion" -> "5", + "startingBoundInclusive" -> "false", + "endingBoundInclusive" -> "false"), + testTimeZone) + val range = info.range().asInstanceOf[ChangelogRange.VersionRange] + assert(!range.startingBoundInclusive()) + assert(!range.endingBoundInclusive()) + } + + test("invalid timestamp throws") { + checkError( + intercept[AnalysisException] { + ChangelogInfoUtils.fromOptions( + makeOptions("startingTimestamp" -> "not-a-timestamp"), testTimeZone) + }, + condition = "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION", + parameters = Map("expr" -> "'not-a-timestamp'")) + } + + test("timestamp range respects session time zone") { + val tsStr = "2026-01-01 00:00:00" + // 2026-01-01 00:00:00 UTC in microseconds since epoch + val expectedUtcMicros = 1767225600000000L + // 2026-01-01 00:00:00 PST (UTC-8) in microseconds since epoch + // = 2026-01-01 08:00:00 UTC = expectedUtcMicros + 8h + val expectedPstMicros = 1767254400000000L + + val utcInfo = ChangelogInfoUtils.fromOptions( + makeOptions("startingTimestamp" -> tsStr), "UTC") + val utcRange = + utcInfo.range().asInstanceOf[ChangelogRange.TimestampRange] + assert(utcRange.startingTimestamp() === expectedUtcMicros) + + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val laInfo = ChangelogInfoUtils.fromOptions( + makeOptions("startingTimestamp" -> tsStr), + SQLConf.get.sessionLocalTimeZone) + val laRange = + laInfo.range().asInstanceOf[ChangelogRange.TimestampRange] + assert(laRange.startingTimestamp() === expectedPstMicros) + } + } + +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala new file mode 100644 index 0000000000000..46d215029865a --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * An [[InMemoryTableCatalog]] that declares [[TableCatalogCapability.SUPPORT_CHANGELOG]] + * and implements [[TableCatalog.loadChangelog()]]. + * + * Change rows can be pre-populated via [[addChangeRows()]] before querying. + */ +class InMemoryChangelogCatalog extends InMemoryTableCatalog { + + // tableName -> list of change rows (each row: Array[Any] matching changelog schema) + private val changeData: mutable.Map[String, mutable.ArrayBuffer[InternalRow]] = + mutable.Map.empty + + override def capabilities: java.util.Set[TableCatalogCapability] = { + val caps = new java.util.HashSet(super.capabilities) + caps.add(TableCatalogCapability.SUPPORT_CHANGELOG) + caps + } + + override def loadChangelog( + ident: Identifier, + changelogInfo: ChangelogInfo): Changelog = { + if (!tableExists(ident)) { + throw new NoSuchTableException(ident.asMultipartIdentifier) + } + val table = loadTable(ident) + val rows = changeData.getOrElse( + ident.toString, mutable.ArrayBuffer.empty) + new InMemoryChangelog( + table.name + "_changelog", table.columns, rows.toSeq) + } + + /** + * Add change rows for a table. Each row should match the changelog schema: + * (data columns..., _change_type STRING, _commit_version LONG, _commit_timestamp LONG). + */ + def addChangeRows(ident: Identifier, rows: Seq[InternalRow]): Unit = { + val buf = changeData.getOrElseUpdate( + ident.toString, mutable.ArrayBuffer.empty) + buf ++= rows + } + + def clearChangeRows(ident: Identifier): Unit = { + changeData.remove(ident.toString) + } +} + +/** + * A test [[Changelog]] that returns pre-populated change rows. + * + * Reports `containsCarryoverRows = false` so Spark skips carry-over removal. + */ +class InMemoryChangelog( + tableName: String, + dataColumns: Array[Column], + changeRows: Seq[InternalRow]) extends Changelog { + + private val cdcColumns: Array[Column] = dataColumns ++ Array( + Column.create("_change_type", StringType), + Column.create("_commit_version", LongType), + Column.create("_commit_timestamp", TimestampType)) + + override def name(): String = tableName + + override def columns(): Array[Column] = cdcColumns + + override def containsCarryoverRows(): Boolean = false + + override def containsIntermediateChanges(): Boolean = false + + override def representsUpdateAsDeleteAndInsert(): Boolean = false + + override def newScanBuilder( + options: CaseInsensitiveStringMap): ScanBuilder = { + new InMemoryChangelogScanBuilder(readSchema, changeRows) + } + + def readSchema: StructType = { + CatalogV2Util.v2ColumnsToStructType(cdcColumns) + } +} + +private class InMemoryChangelogScanBuilder( + schema: StructType, + rows: Seq[InternalRow]) extends ScanBuilder { + override def build(): Scan = + new InMemoryChangelogScan(schema, rows) +} + +private class InMemoryChangelogScan( + schema: StructType, + rows: Seq[InternalRow]) extends Scan with Batch { + + override def readSchema(): StructType = schema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = { + Array(InMemoryChangelogPartition(rows)) + } + + override def createReaderFactory(): PartitionReaderFactory = { + new InMemoryChangelogReaderFactory() + } +} + +private case class InMemoryChangelogPartition( + rows: Seq[InternalRow]) extends InputPartition + +private class InMemoryChangelogReaderFactory + extends PartitionReaderFactory { + override def createReader( + partition: InputPartition): PartitionReader[InternalRow] = { + new InMemoryChangelogReader( + partition.asInstanceOf[InMemoryChangelogPartition]) + } +} + +private class InMemoryChangelogReader( + partition: InMemoryChangelogPartition) + extends PartitionReader[InternalRow] { + + private var index = -1 + private val rows = partition.rows + + override def next(): Boolean = { + index += 1 + index < rows.size + } + + override def get(): InternalRow = rows(index) + + override def close(): Unit = {} +}