Skip to content

[SPARK-56676][SQL][DML] DSv2 Transactional Streaming Writes need to Validate Target between Microbatches#55623

Closed
andreaschat-db wants to merge 10 commits into
apache:masterfrom
andreaschat-db:dsv2TransactionApiImprovements
Closed

[SPARK-56676][SQL][DML] DSv2 Transactional Streaming Writes need to Validate Target between Microbatches#55623
andreaschat-db wants to merge 10 commits into
apache:masterfrom
andreaschat-db:dsv2TransactionApiImprovements

Conversation

@andreaschat-db

@andreaschat-db andreaschat-db commented Apr 30, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR addresses post-merge comments to the Transaction API: #55278. The focus is on improving streaming use cases. In particular, for transactional catalogs the streaming target is created as a v2 table reference so we can detect any table changes between micro batches.

Why are the changes needed?

We need to detect any changes of the write target in each micro batch.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new tests for streaming use cases.

Was this patch authored or co-authored using generative AI tooling?

Claude Sonnet 4.6.

@andreaschat-db andreaschat-db changed the title [WIP][SQL][DML] DSv2 Transaction API Improvements [SPARK-56676][SQL][DML] DSv2 Transaction API Improvements Apr 30, 2026
@andreaschat-db andreaschat-db marked this pull request as ready for review April 30, 2026 14:24
table match {
// Streaming write targets are constructed with isStreaming=false even inside a streaming
// query, because the sink is a regular batch write destination.
case u: UnresolvedRelation if !u.isStreaming =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you create UnresolvedRelation in MicroBatchExecution and control the flag?

Also, I am not sure it is a good idea to use UnresolvedRelation in the streaming template plan. It means we will re-resolve the target table on each batch and will not be able to detect that the table changed (e.g. dropped and recreated). I believe a better solution would be to use V2TableReference with a new write context (say WriteTargetContext). That way you can insert needed validation for each batch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated the suggestion discussed offline about using directly a V2TableReference.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the comment still?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we need to rename the method or even rollback the change where this code only applied to batch target?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed stale comment and reverted refactor.

* {@link org.apache.spark.sql.connector.catalog.transactions.Transaction}, the transaction is
* committed separately via
* When this write is part of a
* {@link org.apache.spark.sql.connector.catalog.transactions.Transaction}, connector

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do qualified import for Transaction?

s"If a new extension point was added, register it in Analyzer.withCatalogManager, " +
s"add an assertion in this test, and update EXPECTED_FIELD_COUNT.")


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: extra empty line?

@andreaschat-db andreaschat-db force-pushed the dsv2TransactionApiImprovements branch from 6445782 to 9dd8c27 Compare May 7, 2026 08:58
@andreaschat-db andreaschat-db requested a review from aokolnychyi May 7, 2026 09:00
@andreaschat-db andreaschat-db changed the title [SPARK-56676][SQL][DML] DSv2 Transaction API Improvements [SPARK-56676][SQL][DML] DSv2 Transaction API Streaming Improvements May 7, 2026
case ref: V2TableReference =>
relationResolution.resolveReference(ref) match {
case r: NamedRelation => write.withNewTable(r)
case other => throw SparkException.internalError(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed? What do we do in other places?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All codepaths in resolveReference return DataSourceV2Relation which is a NamedRelation. Other call sites return LogicalPlan, so they do not need to explicitly validate the return type. I would keep the validation to be foolproof against future changes.

The alternative is to tighten the signature of resolveReference and return NamedRelation instead. Is there a particular reason, similar functions such as loadRelation/getOrLoadRelation/adaptCachedRelation return LogicalPlan?

@andreaschat-db andreaschat-db requested a review from aokolnychyi May 8, 2026 11:47
@andreaschat-db andreaschat-db changed the title [SPARK-56676][SQL][DML] DSv2 Transaction API Streaming Improvements [SPARK-56676][SQL][DML] Fix DSv2 transactional streaming writes to pick up target table changes between micro batches May 8, 2026
@andreaschat-db andreaschat-db changed the title [SPARK-56676][SQL][DML] Fix DSv2 transactional streaming writes to pick up target table changes between micro batches [SPARK-56676][SQL][DML] DSv2 Transactional Streaming Writes need to Validate Target between Microbatches May 8, 2026
aokolnychyi pushed a commit that referenced this pull request May 20, 2026
…alidate Target between Microbatches

### What changes were proposed in this pull request?

This PR addresses post-merge comments to the Transaction API: #55278. The focus is on improving streaming use cases. In particular, for transactional catalogs the streaming target is created as a v2 table reference so we can detect any table changes between micro batches.

### Why are the changes needed?

We need to detect any changes of the write target in each micro batch.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new tests for streaming use cases.

### Was this patch authored or co-authored using generative AI tooling?

Claude Sonnet 4.6.

Closes #55623 from andreaschat-db/dsv2TransactionApiImprovements.

Authored-by: Andreas Chatzistergiou <andreas.chatzistergiou@databricks.com>
Signed-off-by: Anton Okolnychyi <aokolnychyi@apache.org>
(cherry picked from commit 8198896)
Signed-off-by: Anton Okolnychyi <aokolnychyi@apache.org>
viirya added a commit that referenced this pull request May 22, 2026
…py cherry-pick prompts

### What changes were proposed in this pull request?

When a committer manually types `branch-M.N` at the cherry-pick prompt while `branch-M.x` exists and has not yet received the commit, the script now surfaces the Upstream-First policy and offers to pick into both branches in one step (the policy-compliant default). The committer can still pick only `branch-M.N` if the commit is genuinely a `branch-M.N`-only maintenance bugfix, or abort.

Implementation notes:

- Split `cherry_pick` into `_do_cherry_pick` (fetch + cherry-pick + push) and `cherry_pick` (prompt + policy check). The policy wrapper returns a list of refs so the main loop can advance its remaining-branches list correctly when one prompt consumes two branches.
- Replace the `branch_iter` iterator with a mutable `remaining_branches` list in the main cherry-pick loop, so picks consumed by the two-branch path are accounted for in the next prompt's default.
- Add an `already_picked` parameter to `cherry_pick` so the policy check skips its prompt when `branch-M.x` is in the set of refs already touched this session (e.g. when the PR was merged into `branch-M.x` and the loop is now picking into `branch-M.N`).

### Why are the changes needed?

The Upstream-First backporting policy (documented in the header comment of `dev/merge_spark_pr.py`) requires non-bugfix commits to flow through `branch-M.x` before reaching `branch-M.N`. The merge script already orders `branch-M.x` ahead of `branch-M.N` as the cherry-pick default. However, when a committer types `branch-M.N` at the prompt, the script silently proceeds and `branch-M.x` is never revisited.

This has led to commits landing on `branch-4.2` but missing `branch-4.x`. Six such commits observed on the current branches (as of 2026-05-22):

- SPARK-56700 (#55651)
- SPARK-56676 (#55623)
- SPARK-56838 (#55836)
- SPARK-56650 (#55589)
- SPARK-56856 (#55969)
- SPARK-56977 (#56023)

All six landed on master and `branch-4.2` but were not cherry-picked to `branch-4.x`, requiring follow-up backports.

### Does this PR introduce _any_ user-facing change?

Yes for committers using `dev/merge_spark_pr.py`. When the typed cherry-pick target is `branch-M.N` and `branch-M.x` exists and is not yet picked, an additional prompt asks whether to pick into both. Accepting the default ("both") preserves prior behavior plus an extra cherry-pick to `branch-M.x`.

No change when the committer accepts the default `branch-M.x` target, or when picking into `branch-M.x` first and `branch-M.N` second (the typical policy-compliant flow).

### How was this patch tested?

- `python3 -m doctest dev/merge_spark_pr.py` passes (34/34, all pre-existing tests — none cover the new policy logic).
- New `cherry_pick` policy logic was reviewed for behavior but **not exercised end-to-end**: actually running `merge_spark_pr.py` requires committer privileges and a live open PR to merge. Edge cases were traced by reading the code (PR target = master with manual branch-M.N entry; PR target = branch-M.x with default branch-M.N pick; multiple iterations after a two-branch pick).
- Reviewers familiar with the merge flow are encouraged to verify behavior on first real use, especially the abort path and the interaction with manual conflict resolution inside `_do_cherry_pick`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.7)

Closes #56058 from viirya/infra-merge-script-upstream-first-policy.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
cloud-fan pushed a commit that referenced this pull request May 25, 2026
…alidate Target between Microbatches

### What changes were proposed in this pull request?

This PR addresses post-merge comments to the Transaction API: #55278. The focus is on improving streaming use cases. In particular, for transactional catalogs the streaming target is created as a v2 table reference so we can detect any table changes between micro batches.

### Why are the changes needed?

We need to detect any changes of the write target in each micro batch.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added new tests for streaming use cases.

### Was this patch authored or co-authored using generative AI tooling?

Claude Sonnet 4.6.

Closes #55623 from andreaschat-db/dsv2TransactionApiImprovements.

Authored-by: Andreas Chatzistergiou <andreas.chatzistergiou@databricks.com>
Signed-off-by: Anton Okolnychyi <aokolnychyi@apache.org>
(cherry picked from commit 8198896)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants