Skip to content

[SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution#55672

Closed
ericm-db wants to merge 6 commits into
apache:masterfrom
ericm-db:sink-evolution-api
Closed

[SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution#55672
ericm-db wants to merge 6 commits into
apache:masterfrom
ericm-db:sink-evolution-api

Conversation

@ericm-db

@ericm-db ericm-db commented May 4, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR adds the ability to name streaming sinks via the name() method on DataStreamWriter, laying the groundwork for sink evolution capability. This is analogous to the existing source evolution support (DataStreamReader.name()).

Changes:

  • Add name(sinkName) method to DataStreamWriter (API abstract method, classic implementation, Connect stub)
  • Add sinkName: Option[String] field to WriteToStream and userSpecifiedSinkName: Option[String] to WriteToStreamStatement plan nodes
  • Add spark.sql.streaming.queryEvolution.enableSinkEvolution internal config to SQLConf
  • Add sink name validation — names must be alphanumeric + underscore only
  • Add enforcement in MicroBatchExecution — when sink evolution is enabled, sinks must be explicitly named
  • Add MicroBatchExecution.DEFAULT_SINK_NAME ("sink-0") for backward compatibility
  • Thread sinkName through StreamingQueryManager and ResolveWriteToStream
  • Add error conditions: INVALID_SINK_NAME, UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT
  • Add QueryCompilationErrors.invalidStreamingSinkNameError
  • Add StreamingSinkEvolutionSuite with tests for validation and enforcement

All new APIs are private[sql] or internal() — the name() method is not yet publicly callable. It will be opened up once commit log support for persisting sink metadata is added in a follow-up PR.

Why are the changes needed?

Currently, streaming queries have no mechanism for sink evolution. If a user wants to change the sink of a streaming query while preserving the checkpoint, there is no way to track which sink was used historically. This PR introduces the naming API as the first step toward full sink evolution support, where sinks can be added, removed, or replaced while maintaining checkpoint integrity.

This mirrors the existing source evolution support added via DataStreamReader.name() and spark.sql.streaming.queryEvolution.enableSourceEvolution.

Does this PR introduce any user-facing change?

No. All new APIs are private[sql] and the config is internal(). No user-facing changes until the feature is fully implemented with commit log support in a follow-up PR.

How was this patch tested?

Added StreamingSinkEvolutionSuite with 7 test cases covering:

  • Invalid sink name validation (hyphen, space, special characters)
  • Valid sink name patterns (alphanumeric, underscore, digits)
  • Enforcement: unnamed sink with evolution enabled throws UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT
  • Enforcement: unnamed sink without evolution enabled succeeds (backward compatibility)
  • Named sink with evolution enabled succeeds
  • Continuing with the same sink name across restarts works

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-6)

ericm-db added 3 commits May 4, 2026 12:34
…ception for sink evolution

Adds the MiMa `ReversedMissingMethodProblem` exclusion for the newly added
`DataStreamWriter.name()` API, and registers the new
`spark.sql.streaming.queryEvolution.enableSinkEvolution` SQL config in the
binding-policy exceptions file (consistent with its `enableSourceEvolution`
sibling).

Co-authored-by: Isaac

@anishshri-db anishshri-db left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending nit/question

ericm-db added 3 commits May 21, 2026 15:49
…ator

Replace Scaladoc `[[AnalysisException]]` and `[[IllegalArgumentException]]`
references with backtick code spans. The Scaladoc-to-Javadoc conversion
turned them into unresolved `{@link ...}` references because the generated
Java file does not carry imports, breaking the unidoc build.
anishshri-db pushed a commit that referenced this pull request May 23, 2026
### What changes were proposed in this pull request?

This PR adds the ability to name streaming sinks via the `name()` method on `DataStreamWriter`, laying the groundwork for sink evolution capability. This is analogous to the existing source evolution support (`DataStreamReader.name()`).

**Changes:**
- Add `name(sinkName)` method to `DataStreamWriter` (API abstract method, classic implementation, Connect stub)
- Add `sinkName: Option[String]` field to `WriteToStream` and `userSpecifiedSinkName: Option[String]` to `WriteToStreamStatement` plan nodes
- Add `spark.sql.streaming.queryEvolution.enableSinkEvolution` internal config to `SQLConf`
- Add sink name validation — names must be alphanumeric + underscore only
- Add enforcement in `MicroBatchExecution` — when sink evolution is enabled, sinks must be explicitly named
- Add `MicroBatchExecution.DEFAULT_SINK_NAME` (`"sink-0"`) for backward compatibility
- Thread `sinkName` through `StreamingQueryManager` and `ResolveWriteToStream`
- Add error conditions: `INVALID_SINK_NAME`, `UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
- Add `QueryCompilationErrors.invalidStreamingSinkNameError`
- Add `StreamingSinkEvolutionSuite` with tests for validation and enforcement

All new APIs are `private[sql]` or `internal()` — the `name()` method is not yet publicly callable. It will be opened up once commit log support for persisting sink metadata is added in a follow-up PR.

### Why are the changes needed?

Currently, streaming queries have no mechanism for sink evolution. If a user wants to change the sink of a streaming query while preserving the checkpoint, there is no way to track which sink was used historically. This PR introduces the naming API as the first step toward full sink evolution support, where sinks can be added, removed, or replaced while maintaining checkpoint integrity.

This mirrors the existing source evolution support added via `DataStreamReader.name()` and `spark.sql.streaming.queryEvolution.enableSourceEvolution`.

### Does this PR introduce _any_ user-facing change?

No. All new APIs are `private[sql]` and the config is `internal()`. No user-facing changes until the feature is fully implemented with commit log support in a follow-up PR.

### How was this patch tested?

Added `StreamingSinkEvolutionSuite` with 7 test cases covering:
- Invalid sink name validation (hyphen, space, special characters)
- Valid sink name patterns (alphanumeric, underscore, digits)
- Enforcement: unnamed sink with evolution enabled throws `UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
- Enforcement: unnamed sink without evolution enabled succeeds (backward compatibility)
- Named sink with evolution enabled succeeds
- Continuing with the same sink name across restarts works

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-6)

Closes #55672 from ericm-db/sink-evolution-api.

Authored-by: ericm-db <eric.marnadi@databricks.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
(cherry picked from commit 2039927)
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
anishshri-db pushed a commit that referenced this pull request May 23, 2026
### What changes were proposed in this pull request?

This PR adds the ability to name streaming sinks via the `name()` method on `DataStreamWriter`, laying the groundwork for sink evolution capability. This is analogous to the existing source evolution support (`DataStreamReader.name()`).

**Changes:**
- Add `name(sinkName)` method to `DataStreamWriter` (API abstract method, classic implementation, Connect stub)
- Add `sinkName: Option[String]` field to `WriteToStream` and `userSpecifiedSinkName: Option[String]` to `WriteToStreamStatement` plan nodes
- Add `spark.sql.streaming.queryEvolution.enableSinkEvolution` internal config to `SQLConf`
- Add sink name validation — names must be alphanumeric + underscore only
- Add enforcement in `MicroBatchExecution` — when sink evolution is enabled, sinks must be explicitly named
- Add `MicroBatchExecution.DEFAULT_SINK_NAME` (`"sink-0"`) for backward compatibility
- Thread `sinkName` through `StreamingQueryManager` and `ResolveWriteToStream`
- Add error conditions: `INVALID_SINK_NAME`, `UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
- Add `QueryCompilationErrors.invalidStreamingSinkNameError`
- Add `StreamingSinkEvolutionSuite` with tests for validation and enforcement

All new APIs are `private[sql]` or `internal()` — the `name()` method is not yet publicly callable. It will be opened up once commit log support for persisting sink metadata is added in a follow-up PR.

### Why are the changes needed?

Currently, streaming queries have no mechanism for sink evolution. If a user wants to change the sink of a streaming query while preserving the checkpoint, there is no way to track which sink was used historically. This PR introduces the naming API as the first step toward full sink evolution support, where sinks can be added, removed, or replaced while maintaining checkpoint integrity.

This mirrors the existing source evolution support added via `DataStreamReader.name()` and `spark.sql.streaming.queryEvolution.enableSourceEvolution`.

### Does this PR introduce _any_ user-facing change?

No. All new APIs are `private[sql]` and the config is `internal()`. No user-facing changes until the feature is fully implemented with commit log support in a follow-up PR.

### How was this patch tested?

Added `StreamingSinkEvolutionSuite` with 7 test cases covering:
- Invalid sink name validation (hyphen, space, special characters)
- Valid sink name patterns (alphanumeric, underscore, digits)
- Enforcement: unnamed sink with evolution enabled throws `UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT`
- Enforcement: unnamed sink without evolution enabled succeeds (backward compatibility)
- Named sink with evolution enabled succeeds
- Continuing with the same sink name across restarts works

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-6)

Closes #55672 from ericm-db/sink-evolution-api.

Authored-by: ericm-db <eric.marnadi@databricks.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
(cherry picked from commit 2039927)
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants