Skip to content

[SPARK-55950][PYTHON][CONNECT] Add PySpark support for CDC changes() API#54746

Closed
gengliangwang wants to merge 10 commits into
apache:masterfrom
gengliangwang:cdc-pyspark
Closed

[SPARK-55950][PYTHON][CONNECT] Add PySpark support for CDC changes() API#54746
gengliangwang wants to merge 10 commits into
apache:masterfrom
gengliangwang:cdc-pyspark

Conversation

@gengliangwang

@gengliangwang gengliangwang commented Mar 10, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Add changes() method to PySpark DataFrameReader and DataStreamReader for both classic and Spark Connect modes.

Classic PySpark:

  • DataFrameReader.changes(tableName) — delegates to self._jreader.changes(tableName)
  • DataStreamReader.changes(tableName) — delegates to self._jreader.changes(tableName) with type checking

Spark Connect PySpark:

  • New RelationChanges plan class in plan.py that serializes to the RelationChanges protobuf message
  • DataFrameReader.changes(tableName) — creates RelationChanges plan (batch)
  • DataStreamReader.changes(tableName) — creates RelationChanges plan with is_streaming=True

Why are the changes needed?

To expose the CDC changes() API added in #54739 to Python users.

Does this PR introduce any user-facing change?

Yes. PySpark users can now use:

# Batch
df = spark.read.option("startingVersion", "1").changes("my_table")

# Streaming
df = spark.readStream.option("startingVersion", "1").changes("my_table")

How was this patch tested?

7 plan generation tests in test_connect_plan.py covering:

  • Batch read with version/timestamp options
  • No-options and multi-part table names
  • Proto oneof discriminator verification
  • Streaming via direct plan and via DataStreamReader
  • print() debug output

Was this patch authored or co-authored using generative AI tooling?

Yes.

@gengliangwang gengliangwang marked this pull request as draft March 10, 2026 22:40
gengliangwang and others added 2 commits March 19, 2026 21:19
Add changes() method to PySpark DataFrameReader and DataStreamReader
for both classic and Spark Connect modes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gengliangwang gengliangwang marked this pull request as ready for review March 20, 2026 05:40
Comment thread python/pyspark/sql/tests/test_cdc.py Outdated
@@ -0,0 +1,610 @@
#

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new test files should be added in modules.py

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return conf

def _jvm(self):
return PySparkSession._instantiatedSession._jvm

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could this work in spark connect?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment explaining why JVM access is needed and how it works through the underlying classic session

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need both connect session and classic session, you may want to use ReusedMixedTestCase

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhengruifeng thanks, updated

Comment thread python/pyspark/sql/tests/test_cdc.py Outdated
def _make_change_row(self, id, data, change_type, commit_version, commit_timestamp):
jvm = self._jvm()
UTF8String = jvm.org.apache.spark.unsafe.types.UTF8String
row = jvm.org.apache.spark.sql.catalyst.expressions.GenericInternalRow(5)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems the tests are heavily relying on the JVM side methods in InMemoryChangelogCatalog?

is it possible to just add basic E2E tests for the new changes API? I don't know.

@cloud-fan @HyukjinKwon do you have any ideas?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just update the PR to make the testing light. We can't end-to-end test without the InMemoryChangelogCatalog

@zhengruifeng zhengruifeng changed the title [SPARK-55950][PYTHON] Add PySpark support for CDC changes() API [SPARK-55950][PYTHON][CONNECT] Add PySpark support for CDC changes() API Mar 26, 2026
@zhengruifeng

Copy link
Copy Markdown
Contributor

merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants