Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2753,6 +2753,34 @@
],
"sqlState" : "22P03"
},
"INVALID_CDC_OPTION" : {
"message" : [
"Invalid Change Data Capture (CDC) option."
],
"sqlState" : "42K03",
"subClass" : {
"CONFLICTING_RANGE_TYPES" : {
"message" : [
"Cannot specify both version and timestamp ranges for CDC queries. Use either startingVersion/endingVersion or startingTimestamp/endingTimestamp."
]
},
"INVALID_DEDUPLICATION_MODE" : {
"message" : [
"Invalid deduplicationMode: '<mode>'. Expected one of: none, dropCarryovers, netChanges."
]
},
"MISSING_STARTING_TIMESTAMP" : {
"message" : [
"startingTimestamp is required when endingTimestamp is specified for CDC queries."
]
},
"MISSING_STARTING_VERSION" : {
"message" : [
"startingVersion is required when endingVersion is specified for CDC queries."
]
}
}
},
"INVALID_CLONE_SESSION_REQUEST" : {
"message" : [
"Invalid session clone request."
Expand Down Expand Up @@ -7013,6 +7041,16 @@
"Catalog <catalogName> does not support <operation>."
]
},
"CHANGE_DATA_CAPTURE" : {
"message" : [
"Catalog <catalogName> does not support Change Data Capture (CDC). The catalog must declare the SUPPORT_CHANGELOG capability."
]
},
"CHANGE_DATA_CAPTURE_ON_RELATION" : {
"message" : [
"Change Data Capture (CDC) on the relation: <relationId>."
]
},
"CLAUSE_WITH_PIPE_OPERATORS" : {
"message" : [
"The SQL pipe operator syntax using |> does not support <clauses>."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* The central connector interface for Change Data Capture (CDC).
* <p>
* Connectors implement this minimal interface to expose change data. Spark handles
* post-processing (carry-over removal, update detection, net change computation) based on
* the properties declared by the connector.
* <p>
* The columns returned by {@link #columns()} must include the following metadata columns:
* <ul>
* <li>{@code _change_type} (STRING) — the kind of change: {@code insert}, {@code delete},
* {@code update_preimage}, or {@code update_postimage}</li>
* <li>{@code _commit_version} (connector-defined type, e.g. LONG) — the version containing
* this change</li>
* <li>{@code _commit_timestamp} (TIMESTAMP) — the timestamp of the commit</li>
* </ul>
*
* @since 4.2.0
*/
@Evolving
public interface Changelog {

/** A name to identify this changelog. */
String name();

/**
* Returns the columns of this changelog, including data columns and the required
* metadata columns ({@code _change_type}, {@code _commit_version},
* {@code _commit_timestamp}).
*/
Column[] columns();

/**
* Whether the change data may contain identical insert/delete carry-over pairs
* produced by copy-on-write file rewrites.
* <p>
* If {@code false}, the connector guarantees that no carry-over pairs are present and
* Spark will skip carry-over removal entirely.
*/
boolean containsCarryoverRows();

/**
* Whether the change data may contain multiple intermediate states per row identity
* within a single commit version.
* <p>
* If {@code false}, the connector guarantees at most one change per row identity per
* commit version, and Spark will skip net change computation.
*/
boolean containsIntermediateChanges();

/**
* Whether updates are represented as delete+insert pairs rather than fully
* materialized {@code update_preimage} and {@code update_postimage} entries.
* <p>
* If {@code false}, the connector guarantees that update pre/post-images are already
* present in the change data. Spark will not attempt to derive updates from
* insert/delete pairs.
*/
boolean representsUpdateAsDeleteAndInsert();

/**
* Returns a new {@link ScanBuilder} for reading the change data.
*
* @param options read options (case-insensitive string map)
*/
ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);

/**
* Returns the columns that uniquely identify a row, used for update detection and
* net change computation.
* <p>
* The default implementation returns an empty array, which means row identity is not
* available and update detection / net changes cannot be performed.
*/
default NamedReference[] rowId() {
return new NamedReference[0];
}

/**
* Returns the column used for ordering changes within the same row identity, used for
* update detection.
* <p>
* The default implementation returns {@code null}, which means no ordering column is
* available.
*/
default NamedReference rowVersion() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Objects;

import org.apache.spark.annotation.Evolving;

/**
* Encapsulates the parameters of a Change Data Capture (CDC) query, passed from the
* parser / DataFrame API to the catalog's
* {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)} method.
*
* @since 4.2.0
*/
@Evolving
public class ChangelogInfo {

/**
* Deduplication modes controlling how Spark post-processes raw change data.
*/
public enum DeduplicationMode {
/** Raw change rows as-is from the connector — no post-processing. */
NONE,
/** Remove identical insert/delete pairs from copy-on-write file rewrites (default). */
DROP_CARRYOVERS,
/** Collapse to one net change per row identity. */
NET_CHANGES
}

private final ChangelogRange range;
private final DeduplicationMode deduplicationMode;
private final boolean computeUpdates;

public ChangelogInfo(
ChangelogRange range,
DeduplicationMode deduplicationMode,
boolean computeUpdates) {
this.range = range;
this.deduplicationMode = deduplicationMode;
this.computeUpdates = computeUpdates;
}

/** Returns the version/timestamp range for this CDC query. */
public ChangelogRange range() { return range; }

/** Returns the deduplication mode for this CDC query. */
public DeduplicationMode deduplicationMode() { return deduplicationMode; }

/** Whether to derive update_preimage/update_postimage from insert/delete pairs. */
public boolean computeUpdates() { return computeUpdates; }

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ChangelogInfo that)) return false;
return computeUpdates == that.computeUpdates
&& Objects.equals(range, that.range)
&& deduplicationMode == that.deduplicationMode;
}

@Override
public int hashCode() {
return Objects.hash(range, deduplicationMode, computeUpdates);
}

@Override
public String toString() {
return "ChangelogInfo{range=" + range +
", deduplicationMode=" + deduplicationMode +
", computeUpdates=" + computeUpdates + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Optional;

import org.apache.spark.annotation.Evolving;

/**
* Represents the version or timestamp range for a Change Data Capture (CDC) query.
* <p>
* This sealed interface has three implementations:
* <ul>
* <li>{@link VersionRange} — range defined by version identifiers</li>
* <li>{@link TimestampRange} — range defined by timestamps</li>
* <li>{@link Unbounded} — no boundaries (used by streaming queries)</li>
* </ul>
*
* @since 4.2.0
*/
@Evolving
public sealed interface ChangelogRange
permits ChangelogRange.VersionRange, ChangelogRange.TimestampRange, ChangelogRange.Unbounded {

/** Whether the starting bound is inclusive. */
boolean startingBoundInclusive();

/** Whether the ending bound is inclusive. */
boolean endingBoundInclusive();

/**
* A changelog range defined by version identifiers.
*
* @param startingVersion the starting version (always present)
* @param endingVersion the ending version (empty means latest)
* @param startingBoundInclusive whether the starting bound is inclusive
* @param endingBoundInclusive whether the ending bound is inclusive
*/
record VersionRange(
String startingVersion,
Optional<String> endingVersion,
boolean startingBoundInclusive,
boolean endingBoundInclusive) implements ChangelogRange {}

/**
* A changelog range defined by timestamps.
*
* @param startingTimestamp the starting timestamp in microseconds since epoch
* @param endingTimestamp the ending timestamp in microseconds since epoch (empty means latest)
* @param startingBoundInclusive whether the starting bound is inclusive
* @param endingBoundInclusive whether the ending bound is inclusive
*/
record TimestampRange(
long startingTimestamp,
Optional<Long> endingTimestamp,
boolean startingBoundInclusive,
boolean endingBoundInclusive) implements ChangelogRange {}

/**
* An unbounded changelog range with no starting or ending boundaries.
* Used by streaming queries where the connector determines the starting point.
*/
record Unbounded() implements ChangelogRange {
@Override public boolean startingBoundInclusive() { return true; }
@Override public boolean endingBoundInclusive() { return true; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,27 @@ default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExce
throw QueryCompilationErrors.noSuchTableError(name(), ident);
}

/**
* Load a {@link Changelog} for the given table, representing the row-level changes within the
* range specified by {@code changelogInfo}.
* <p>
* This method is only called when the catalog declares
* {@link TableCatalogCapability#SUPPORT_CHANGELOG} in its {@link #capabilities()} set.
*
* @param ident a table identifier
* @param changelogInfo the CDC query parameters (range, deduplication mode, etc.)
* @return a Changelog instance for the requested table and range
* @throws NoSuchTableException If the table doesn't exist
* @throws UnsupportedOperationException If the catalog does not support CDC
*
* @since 4.2.0
*/
default Changelog loadChangelog(Identifier ident, ChangelogInfo changelogInfo)
throws NoSuchTableException {
throw new UnsupportedOperationException(
name() + " does not support Change Data Capture (CDC)");
}

/**
* Invalidate cached table metadata for an {@link Identifier identifier}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,17 @@ public enum TableCatalogCapability {
* {@link TableCatalog#createTable}.
* See {@link Column#identityColumnSpec()}.
*/
SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS
SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS,

/**
* Signals that the TableCatalog supports Change Data Capture (CDC) queries via
* {@link TableCatalog#loadChangelog(Identifier, ChangelogInfo)}.
* <p>
* Without this capability, any CDC query (SQL {@code CHANGES} clause or
* {@code DataFrameReader.changes()}) targeting this catalog will throw an
* analysis exception during resolution.
*
* @since 4.2.0
*/
SUPPORT_CHANGELOG
}
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,9 @@ class Analyzer(
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)
resolveRelation(u, timeTravelSpec).getOrElse(r)

case r @ RelationChanges(u: UnresolvedRelation, changelogInfo) =>
relationResolution.resolveChangelog(u, changelogInfo).getOrElse(r)

case u @ UnresolvedTable(identifier, cmd, suggestAlternative) =>
lookupTableOrView(identifier).map {
case v: ResolvedPersistentView =>
Expand Down
Loading