Skip to content

[SPARK-55948][SQL] Add DSv2 CDC connector API and analyzer resolution#54737

Closed
gengliangwang wants to merge 1 commit into
apache:masterfrom
gengliangwang:cdc-dsv2-api
Closed

[SPARK-55948][SQL] Add DSv2 CDC connector API and analyzer resolution#54737
gengliangwang wants to merge 1 commit into
apache:masterfrom
gengliangwang:cdc-dsv2-api

Conversation

@gengliangwang

@gengliangwang gengliangwang commented Mar 10, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

This is PR 1/3 of the CDC (Change Data Capture) framework for Spark. It introduces the foundational DSv2 connector API and analyzer resolution logic.

Connector API (new Java interfaces):

  • Changelog: The central connector interface for CDC. Connectors implement this to expose row-level change data with metadata columns (_change_type, _commit_version, _commit_timestamp). Connectors declare properties (containsCarryoverRows, containsIntermediateChanges, representsUpdateAsDeleteAndInsert) that tell Spark what post-processing is needed.
  • ChangelogInfo: Encapsulates CDC query parameters — range, deduplication mode (NONE, DROP_CARRYOVERS, NET_CHANGES), and computeUpdates flag.
  • ChangelogRange: Sealed interface with three record types — VersionRange, TimestampRange, and Unbounded.
  • TableCatalog.loadChangelog(Identifier, ChangelogInfo): Entry point for catalogs to provide a Changelog.
  • TableCatalogCapability.SUPPORT_CHANGELOG: Catalog capability flag that gates CDC support.

Analyzer resolution:

  • RelationChanges: Unresolved logical node representing a CDC query.
  • ChangelogTable: DSv2 Table wrapper around a Changelog, declaring BATCH_READ and MICRO_BATCH_READ capabilities.
  • Resolution rule in Analyzer: resolves RelationChanges -> looks up catalog -> calls loadChangelog() -> wraps in ChangelogTable -> creates DataSourceV2Relation (batch) or StreamingRelationV2 (streaming).
  • CheckAnalysis: Reports TABLE_OR_VIEW_NOT_FOUND for unresolved RelationChanges.
  • CTESubstitution: Blocks CDC queries on CTE relations (mirrors time travel restriction).

Utilities:

  • ChangelogInfoUtils: Parses DataFrame API options into a ChangelogInfo.
  • TimeTravelSpec.resolveTimestampExpression(): Extracted shared method for timestamp evaluation, deduplicating logic between time travel and CDC.

Test infrastructure:

  • InMemoryChangelogCatalog: Test catalog with pre-populated change rows and a working scan pipeline. Reports containsCarryoverRows = false.

Why are the changes needed?

Spark currently has no standardized framework for Change Data Capture (CDC) queries via DSv2 connectors. This PR establishes the connector API contract and analyzer resolution so that connectors can expose row-level change data in a uniform way, and Spark can handle post-processing (carry-over removal, update detection, net change computation) based on connector-declared properties.

Does this PR introduce any user-facing change?

No user-facing change in this PR alone. The connector API and analyzer resolution are internal building blocks. User-facing SQL syntax is added in PR 2 (#54738), and the DataFrame API in PR 3 (#54739).

How was this patch tested?

  • ChangelogInfoUtilsSuite — 18 unit tests covering version/timestamp range parsing, deduplication modes, bound inclusivity, timezone handling, and error cases.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

Introduce the foundational Change Data Capture (CDC) framework for DSv2:

- Changelog interface: connector contract for exposing change data
- ChangelogInfo/ChangelogRange: CDC query parameters (version/timestamp range,
  deduplication mode, compute updates)
- TableCatalog.loadChangelog() and SUPPORT_CHANGELOG capability
- ChangelogTable: DSv2 Table wrapper for Changelog
- RelationChanges: unresolved logical node for CDC queries
- Analyzer resolution: RelationChanges -> ChangelogTable -> DataSourceV2Relation
- ChangelogInfoUtils: utility for parsing CDC options
- TimeTravelSpec: extract shared resolveTimestampExpression method
- Error conditions for CDC validation
- InMemoryChangelogCatalog: test-only implementation with scan support
- ChangelogInfoUtilsSuite: unit tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gengliangwang

Copy link
Copy Markdown
Member Author

Merged into #54738 — combining the DSv2 API and SQL support into a single PR.

@gengliangwang gengliangwang changed the title [SPARK-XXXXX][SQL] Add DSv2 CDC connector API and analyzer resolution [SPARK-55948][SQL] Add DSv2 CDC connector API and analyzer resolution Mar 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant