Skip to content

[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56069

Closed
AnishMahto wants to merge 10 commits into
apache:masterfrom
AnishMahto:SPARK-56651-autocdc-python-api
Closed

[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56069
AnishMahto wants to merge 10 commits into
apache:masterfrom
AnishMahto:SPARK-56651-autocdc-python-api

Conversation

@AnishMahto

@AnishMahto AnishMahto commented May 22, 2026

Copy link
Copy Markdown
Contributor

Takeover of #56045. PR description is copied.

What changes were proposed in this pull request?

Adds create_auto_cdc_flow to the the SDP Python API. For now, this will only support SCD Type 1. Parameters:

  • name: the name of the flow
  • target: the target table
  • source: the source dataset with the change events
  • keys: the unique key per row,
  • sequence_by: a sequence id to establish time order
  • apply_as_deletes: a boolean expression indicating whether an event represents a delete
  • apply_as_truncates: a boolean expression indicating whether an event represents a truncation
  • column_list: a list of columns to include in the target table
  • except_column_list: a list of columns to exclude from the target table
  • stored_as_scd_type the SCD type, must be 1
  • ignore_null_updates_column_list: a list of columns for which to ignore null values
  • ignore_null_updates_except_column_list: a list of columns for which not to ignore null values
  • source_code_location: the location in the Python source code that defines this flow

This PR introduces the PySpark API to register an AutoCDC flow within an SDP, and send the registration requests to the Spark driver via Spark Connect protos.

This PR does not actually handle the reception of said Spark Connect protos, and the pipelines handler in the Spark driver will simply throw some form of an operation unsupported/unrecognized error.

Why are the changes needed?

See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/

Does this PR introduce any user-facing change?

Yes, it introduces a new method in the SDP Python API.

How was this patch tested?

Unit tests were added, using a local graph registry.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

@szehon-ho szehon-ho left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking over #56045 and scoping the API sensibly (dropping truncates / ignore-null updates until execution exists).

A few inline comments below — mainly around default flow_name on the Connect path, client-side validation consistency with other SDP APIs, and test/doc nits.

Note (not inline): PipelinesHandler still throws UnsupportedOperationException for AutoCdcFlowDetails, so Connect definition will fail until server support lands. Worth calling that out in the PR description so users know this PR is Python API + proto wiring only.

Comment thread python/pyspark/pipelines/spark_connect_graph_element_registry.py Outdated
Comment thread python/pyspark/pipelines/api.py Outdated
:param name: The name of the flow for this create_auto_cdc_flow command. When unspecified this \
will build a "default flow" with name equal to the target name.
"""
keys = _normalize_column_list(keys)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other SDP APIs validate inputs up front (create_streaming_table checks type(name) is not str, etc.). Consider similar checks here before building AutoCdcFlow:

  • target / source: str (same pattern as name in create_streaming_table)
  • keys: non-empty list; reject mixed [str, Column] if you want to match Union[List[str], List[Column]]
  • column_list vs except_column_list: error if both are set (doc already says only one is allowed)
  • _normalize_column_list: reject elements that are neither str nor Column with PySparkTypeError instead of passing them through

Happy to keep this minimal for the first cut, but matching existing SDP validation would give clearer errors at definition time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In compliance with other SDP APIs, I added type checks. These actually make sense to do at the Python API layer, since Python specifically does not provide strong static typing.

Leaving logical validations for the Spark driver/pipelines handler though, since these validations are client language independent (ex. if we support any other language clients in the future, the validation should be the same).

Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
Comment thread python/pyspark/pipelines/api.py Outdated
Comment thread python/pyspark/pipelines/api.py
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
@AnishMahto AnishMahto changed the title Spark 56651 autocdc python api [SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1 May 24, 2026
@AnishMahto AnishMahto requested a review from szehon-ho May 24, 2026 00:11

@szehon-ho szehon-ho left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the earlier feedback — default flow name at the API layer, type checks, and the expanded test coverage all look good.

A few minor nits inline (trailing whitespace, doc example indent, test comment typo, and two small test gaps). None are blocking from my side.

Scope note: this PR is Python API + Connect proto wiring only; PipelinesHandler still throws for AutoCdcFlowDetails, so end-to-end Connect definition will fail until server support lands. That's fine for this PR.

LGTM.

Comment thread python/pyspark/pipelines/api.py Outdated
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py Outdated
Comment thread python/pyspark/pipelines/api.py Outdated
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
Comment thread python/pyspark/pipelines/tests/test_graph_element_registry.py
@AnishMahto

Copy link
Copy Markdown
Contributor Author

I don't know if this is intentional or not, but it looks like the docs build environment in CI doesn't have all of the transitive dependencies required by pyspark connect packages. Ex. see failure here due to pyspark.sql.connect.functions.builtin import which requires zstandard >= 0.25.0 to be installed: https://github.com/AnishMahto/spark/actions/runs/26351167405/job/77569752098.

In any case as a workaround I will lazily import pyspark.sql.connect.functions.builtin so that docs build is unaffected.

Comment thread python/pyspark/pipelines/api.py Outdated

@szehon-ho szehon-ho left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its too minor, so approve

anew and others added 10 commits May 25, 2026 05:09
- Remove spaces around = in keyword arguments (PEP 8)
- Fix type hint: List[Union[str, Column]] -> Union[List[str], List[Column]]
- Reorder imports and collapse unnecessary line continuations

Co-authored-by: Isaac
- Move inline imports to module level
- Fix assertNone -> assertIsNone
- Fix assertEqual(stored_as_scd_type, "1") -> assertIsNone for default case
- Add missing assertions for optional fields in test_create_auto_cdc_flow

Co-authored-by: Isaac
- Drop ignore_null_updates_column_list / ignore_null_updates_except_column_list
  from create_auto_cdc_flow until execution support lands; the proto fields
  remain so the server-side wiring is unchanged.
- Add a test exercising string-form arguments (keys=["id"], sequence_by="ts",
  apply_as_deletes=str, apply_as_truncates=str, column_list=[str]) to verify
  they normalize to PySpark Columns.
- Fix Pyspark -> PySpark casing in create_auto_cdc_flow docstring.
- Restore the prior import order in spark_connect_graph_element_registry.py
  so the diff vs master is limited to the substantive symbol additions.
Per AnishMahto's heads-up on apache#56045, apply_as_truncates is
unlikely to land for the 4.2 cut. Following the same principle applied to
ignore_null_updates_*, we drop the parameter from the user-facing Python API
now and re-add it once execution support is in. The proto field stays so the
server-side wiring is untouched.
@AnishMahto AnishMahto force-pushed the SPARK-56651-autocdc-python-api branch from 86851c2 to 7136aff Compare May 25, 2026 05:10
@cloud-fan

Copy link
Copy Markdown
Contributor

thanks, merging to master/4.x/4.2 (most CDC work is in 4.2 already)

@cloud-fan cloud-fan closed this in a100c0b May 26, 2026
cloud-fan pushed a commit that referenced this pull request May 26, 2026
## Takeover of #56045. PR description is copied.

### What changes were proposed in this pull request?
Adds `create_auto_cdc_flow` to the the SDP Python API. For now, this will only support SCD Type 1. Parameters:
- name: the name of the flow
- target: the target table
- source: the source dataset with the change events
- keys: the unique key per row,
- sequence_by: a sequence id to establish time order
- apply_as_deletes: a boolean expression indicating whether an event represents a delete
- ~~apply_as_truncates: a boolean expression indicating whether an event represents a truncation~~
- column_list: a list of columns to include in the target table
- except_column_list: a list of columns to exclude from the target table
- stored_as_scd_type the SCD type, must be 1
- ~~ignore_null_updates_column_list: a list of columns for which to ignore null values~~
- ~~ignore_null_updates_except_column_list: a list of columns for which not to ignore null values~~
- source_code_location: the location in the Python source code that defines this flow

This PR introduces the PySpark API to register an AutoCDC flow within an SDP, and send the registration requests to the Spark driver via Spark Connect protos.

This PR does not actually handle the reception of said Spark Connect protos, and the pipelines handler in the Spark driver will simply throw some form of an operation unsupported/unrecognized error.

### Why are the changes needed?
See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/

### Does this PR introduce _any_ user-facing change?
Yes, it introduces a new method in the SDP Python API.

### How was this patch tested?
Unit tests were added, using a local graph registry.

### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6

Closes #56069 from AnishMahto/SPARK-56651-autocdc-python-api.

Lead-authored-by: AnishMahto <anish.mahto99@gmail.com>
Co-authored-by: andreas-neumann_data <andreas.neumann@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit a100c0b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request May 26, 2026
## Takeover of #56045. PR description is copied.

### What changes were proposed in this pull request?
Adds `create_auto_cdc_flow` to the the SDP Python API. For now, this will only support SCD Type 1. Parameters:
- name: the name of the flow
- target: the target table
- source: the source dataset with the change events
- keys: the unique key per row,
- sequence_by: a sequence id to establish time order
- apply_as_deletes: a boolean expression indicating whether an event represents a delete
- ~~apply_as_truncates: a boolean expression indicating whether an event represents a truncation~~
- column_list: a list of columns to include in the target table
- except_column_list: a list of columns to exclude from the target table
- stored_as_scd_type the SCD type, must be 1
- ~~ignore_null_updates_column_list: a list of columns for which to ignore null values~~
- ~~ignore_null_updates_except_column_list: a list of columns for which not to ignore null values~~
- source_code_location: the location in the Python source code that defines this flow

This PR introduces the PySpark API to register an AutoCDC flow within an SDP, and send the registration requests to the Spark driver via Spark Connect protos.

This PR does not actually handle the reception of said Spark Connect protos, and the pipelines handler in the Spark driver will simply throw some form of an operation unsupported/unrecognized error.

### Why are the changes needed?
See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/

### Does this PR introduce _any_ user-facing change?
Yes, it introduces a new method in the SDP Python API.

### How was this patch tested?
Unit tests were added, using a local graph registry.

### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6

Closes #56069 from AnishMahto/SPARK-56651-autocdc-python-api.

Lead-authored-by: AnishMahto <anish.mahto99@gmail.com>
Co-authored-by: andreas-neumann_data <andreas.neumann@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit a100c0b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit to cloud-fan/spark that referenced this pull request May 26, 2026
…oCDC Python APIs

### What changes were proposed in this pull request?

Follow-up cleanup of review feedback on apache#56069:

- Enforce client-side that `column_list` and `except_column_list` are not both
  set (raises `PySparkValueError` with `INVALID_MULTIPLE_ARGUMENT_CONDITIONS`).
  The docstring already promised this, but the API previously accepted both.
- Enforce client-side that `keys` is non-empty (raises `PySparkValueError`
  with `CANNOT_BE_EMPTY`). The docstring promised "at least one key must be
  provided"; the implementation accepted `keys=[]`.
- Drop `Optional` from `AutoCdcFlow.name` since the API always defaults the
  field to the target name before construction.
- Add a comment on the lazy `pyspark.sql.connect.functions.builtin` imports
  explaining the docs-build constraint (transitive grpc dependency missing
  from the docs environment), so a future refactor doesn't hoist the
  imports and silently break docs CI.
- Minor Scaladoc/docstring fixes: "merge" operation (was "merged"),
  consistency between `api.py` and `flow.py` on the `1`/`"1"` accepted set,
  missing commas, "excluded from" (was "excluded in"), `DataFrame` casing,
  and a backtick on `create_streaming_table`.

Add two tests covering the new client-side validations.

### Why are the changes needed?

The two enforcement gaps are real contract/code divergences in the public
Python API — users following the docstring would expect the validation to
fire client-side, but it would silently slip through to the server (with a
less actionable error message). The `Optional[str]` field type was also
misleading since the API guarantees the field is always populated.

### Does this PR introduce _any_ user-facing change?

No new behavior beyond surfacing client-side validation errors earlier. The
errors raised here would otherwise have been raised by the server (or, in
the case of empty keys, possibly accepted with an empty repeated proto field).

### How was this patch tested?

Added `test_create_auto_cdc_flow_rejects_empty_keys` and
`test_create_auto_cdc_flow_rejects_both_column_lists`. Existing tests
continue to pass.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored by Claude.
szehon-ho pushed a commit that referenced this pull request Jun 1, 2026
…ments for AutoCDC flow dataclasses and Python APIs

This follow-up PR addresses review comments left after #56042 (SPARK-56956, AutoCDC flow dataclasses) and #56069 (SPARK-56651, AutoCDC Python APIs) merged.

### What changes were proposed in this pull request?

#### Scala — `Scd1BatchProcessor` / `Flow`

- Remove the now-dead `Scd1BatchProcessor.validateCdcMetadataColumnNotPresent` validator and its call site. It referenced the error class `AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT` which the parent PR removed from `error-conditions.json`; the new construction-time check in `AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns` is the authoritative validator and supersedes it.
- Reorder `AutoCdcFlow`'s constructor so defaulted params trail the non-defaulted ones (`origin`, `changeArgs`), allowing positional construction.
- Fix Scaladoc/comment text: factual wording for the keys-presence check, the `[[ResolvedFlow.load]]` link, the `Scd1ForeachBatchHandler` reference (was `Scd1ForeachBatchExec`, which does not exist), and several minor grammar/typography nits.

#### Python — `create_auto_cdc_flow` / `AutoCdcFlow`

- Add a comment on the lazy `pyspark.sql.connect.functions.builtin` imports explaining the docs-build constraint (transitive grpc dependency missing from the docs environment), so a future refactor doesn't hoist the imports and silently break docs CI.
- Fix the `INVALID_MULTIPLE_ARGUMENT_CONDITIONS` error template placeholder: `[{arg_names}]` → `[<arg_names>]`. `ErrorClassesReader.get_error_message` extracts required placeholders via `re.findall("<([a-zA-Z0-9_-]+)>", template)` and asserts the extracted set equals `messageParameters.keys()`, so the curly-brace form would trip an `AssertionError` instead of producing the intended `PySparkValueError`. The typo also affected existing callers in `sql/session.py:2267` and `sql/connect/session.py:1056`, but those paths are not exercised by tests.
- Minor docstring fixes: "merge" operation (was "merged"), consistency between `api.py` and `flow.py` on the `1`/`"1"` accepted set, missing commas, "excluded from" (was "excluded in"), `DataFrame` casing, and a backtick on `create_streaming_table`.

### Why are the changes needed?

Cleanup of follow-up items identified during review of the parent PRs. The dead Scala validator is the most material: if its code path were reached, it would throw an internal `SparkException("Cannot find main error class ...")` instead of a user-facing `AnalysisException`. The error-template typo would surface as an `AssertionError` rather than a user-actionable error for the two existing callers.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests in `AutoCdcFlowSuite`, `Scd1BatchProcessorSuite`, `ConnectInvalidPipelineSuite`, and `ConnectValidPipelineSuite` continue to cover the affected paths.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored by Claude.

Closes #56113 from cloud-fan/autocdc-flow-dataclasses-followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Szehon Ho <szehon.apache@gmail.com>
szehon-ho pushed a commit that referenced this pull request Jun 1, 2026
…ments for AutoCDC flow dataclasses and Python APIs

This follow-up PR addresses review comments left after #56042 (SPARK-56956, AutoCDC flow dataclasses) and #56069 (SPARK-56651, AutoCDC Python APIs) merged.

### What changes were proposed in this pull request?

#### Scala — `Scd1BatchProcessor` / `Flow`

- Remove the now-dead `Scd1BatchProcessor.validateCdcMetadataColumnNotPresent` validator and its call site. It referenced the error class `AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT` which the parent PR removed from `error-conditions.json`; the new construction-time check in `AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns` is the authoritative validator and supersedes it.
- Reorder `AutoCdcFlow`'s constructor so defaulted params trail the non-defaulted ones (`origin`, `changeArgs`), allowing positional construction.
- Fix Scaladoc/comment text: factual wording for the keys-presence check, the `[[ResolvedFlow.load]]` link, the `Scd1ForeachBatchHandler` reference (was `Scd1ForeachBatchExec`, which does not exist), and several minor grammar/typography nits.

#### Python — `create_auto_cdc_flow` / `AutoCdcFlow`

- Add a comment on the lazy `pyspark.sql.connect.functions.builtin` imports explaining the docs-build constraint (transitive grpc dependency missing from the docs environment), so a future refactor doesn't hoist the imports and silently break docs CI.
- Fix the `INVALID_MULTIPLE_ARGUMENT_CONDITIONS` error template placeholder: `[{arg_names}]` → `[<arg_names>]`. `ErrorClassesReader.get_error_message` extracts required placeholders via `re.findall("<([a-zA-Z0-9_-]+)>", template)` and asserts the extracted set equals `messageParameters.keys()`, so the curly-brace form would trip an `AssertionError` instead of producing the intended `PySparkValueError`. The typo also affected existing callers in `sql/session.py:2267` and `sql/connect/session.py:1056`, but those paths are not exercised by tests.
- Minor docstring fixes: "merge" operation (was "merged"), consistency between `api.py` and `flow.py` on the `1`/`"1"` accepted set, missing commas, "excluded from" (was "excluded in"), `DataFrame` casing, and a backtick on `create_streaming_table`.

### Why are the changes needed?

Cleanup of follow-up items identified during review of the parent PRs. The dead Scala validator is the most material: if its code path were reached, it would throw an internal `SparkException("Cannot find main error class ...")` instead of a user-facing `AnalysisException`. The error-template typo would surface as an `AssertionError` rather than a user-actionable error for the two existing callers.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests in `AutoCdcFlowSuite`, `Scd1BatchProcessorSuite`, `ConnectInvalidPipelineSuite`, and `ConnectValidPipelineSuite` continue to cover the affected paths.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored by Claude.

Closes #56113 from cloud-fan/autocdc-flow-dataclasses-followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Szehon Ho <szehon.apache@gmail.com>
(cherry picked from commit 3e1650b)
Signed-off-by: Szehon Ho <szehon.apache@gmail.com>
szehon-ho pushed a commit that referenced this pull request Jun 1, 2026
…ments for AutoCDC flow dataclasses and Python APIs

This follow-up PR addresses review comments left after #56042 (SPARK-56956, AutoCDC flow dataclasses) and #56069 (SPARK-56651, AutoCDC Python APIs) merged.

### What changes were proposed in this pull request?

#### Scala — `Scd1BatchProcessor` / `Flow`

- Remove the now-dead `Scd1BatchProcessor.validateCdcMetadataColumnNotPresent` validator and its call site. It referenced the error class `AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT` which the parent PR removed from `error-conditions.json`; the new construction-time check in `AutoCdcMergeFlow.requireReservedPrefixAbsentInSourceColumns` is the authoritative validator and supersedes it.
- Reorder `AutoCdcFlow`'s constructor so defaulted params trail the non-defaulted ones (`origin`, `changeArgs`), allowing positional construction.
- Fix Scaladoc/comment text: factual wording for the keys-presence check, the `[[ResolvedFlow.load]]` link, the `Scd1ForeachBatchHandler` reference (was `Scd1ForeachBatchExec`, which does not exist), and several minor grammar/typography nits.

#### Python — `create_auto_cdc_flow` / `AutoCdcFlow`

- Add a comment on the lazy `pyspark.sql.connect.functions.builtin` imports explaining the docs-build constraint (transitive grpc dependency missing from the docs environment), so a future refactor doesn't hoist the imports and silently break docs CI.
- Fix the `INVALID_MULTIPLE_ARGUMENT_CONDITIONS` error template placeholder: `[{arg_names}]` → `[<arg_names>]`. `ErrorClassesReader.get_error_message` extracts required placeholders via `re.findall("<([a-zA-Z0-9_-]+)>", template)` and asserts the extracted set equals `messageParameters.keys()`, so the curly-brace form would trip an `AssertionError` instead of producing the intended `PySparkValueError`. The typo also affected existing callers in `sql/session.py:2267` and `sql/connect/session.py:1056`, but those paths are not exercised by tests.
- Minor docstring fixes: "merge" operation (was "merged"), consistency between `api.py` and `flow.py` on the `1`/`"1"` accepted set, missing commas, "excluded from" (was "excluded in"), `DataFrame` casing, and a backtick on `create_streaming_table`.

### Why are the changes needed?

Cleanup of follow-up items identified during review of the parent PRs. The dead Scala validator is the most material: if its code path were reached, it would throw an internal `SparkException("Cannot find main error class ...")` instead of a user-facing `AnalysisException`. The error-template typo would surface as an `AssertionError` rather than a user-actionable error for the two existing callers.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests in `AutoCdcFlowSuite`, `Scd1BatchProcessorSuite`, `ConnectInvalidPipelineSuite`, and `ConnectValidPipelineSuite` continue to cover the affected paths.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored by Claude.

Closes #56113 from cloud-fan/autocdc-flow-dataclasses-followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Szehon Ho <szehon.apache@gmail.com>
(cherry picked from commit 3e1650b)
Signed-off-by: Szehon Ho <szehon.apache@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants