Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6855,6 +6855,11 @@
"Duplicate streaming source names detected: <names>. Each streaming source must have a unique name."
]
},
"INVALID_SINK_NAME" : {
"message" : [
"Invalid streaming sink name: '<sinkName>'. Sink names must only contain ASCII letters ('a'-'z', 'A'-'Z'), digits ('0'-'9'), and underscores ('_')."
]
},
"INVALID_SOURCE_NAME" : {
"message" : [
"Invalid streaming source name '<sourceName>'. Source names must only contain ASCII letters (a-z, A-Z), digits (0-9), and underscores (_)."
Expand All @@ -6865,6 +6870,11 @@
"Streaming source naming is not supported. Source name '<name>' was provided but the feature is disabled. Please enable the feature by setting spark.sql.streaming.queryEvolution.enableSourceEvolution to true."
]
},
"UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT" : {
"message" : [
"Streaming sink must be named when spark.sql.streaming.queryEvolution.enableSinkEvolution is enabled. Use the name() method on DataStreamWriter to assign a name to the streaming sink."
]
},
"UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT" : {
"message" : [
"All streaming sources must be named when spark.sql.streaming.queryEvolution.enableSourceEvolution is enabled. Unnamed sources found: <sourceInfo>. Use the name() method to assign names to all streaming sources."
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList"),

// [SPARK-54323][PYTHON] Change the way to access logs to TVF instead of system view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs"),

// [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamWriter.name")
)

// Default exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.streaming

import scala.jdk.CollectionConverters._
import scala.util.matching.Regex

import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoders}
Expand Down Expand Up @@ -356,18 +355,16 @@ abstract class DataStreamReader {
*
* @param sourceName
* the source name to validate
* @throws AnalysisException
* if the source name contains invalid characters
* @throws IllegalArgumentException
* if the source name is null, empty, or contains invalid characters
* if the source name is null or empty
*/
private[sql] def validateSourceName(sourceName: String): Unit = {
require(sourceName != null, "Source name cannot be null")
require(sourceName.nonEmpty, "Source name cannot be empty")

val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r
if (!validNamePattern.pattern.matcher(sourceName).matches()) {
throw new AnalysisException(
StreamingNameValidator.validate(sourceName, "Source") { invalid =>
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME",
messageParameters = Map("sourceName" -> sourceName))
messageParameters = Map("sourceName" -> invalid))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.TimeoutException

import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql.{Dataset, ForeachWriter, WriteConfigMethods}
import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter, WriteConfigMethods}

/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -90,6 +90,19 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T
*/
def queryName(queryName: String): this.type

/**
* Assigns a name to this streaming sink for sink evolution capability. When sinks are named,
* they can be tracked in checkpoint metadata, enabling query evolution.
*
* If not specified, sinks are automatically assigned a default name based on their position in
* the query, which maintains backward compatibility.
*
* @param sinkName
* the unique name for this sink (alphanumeric and underscore only)
* @since 4.1.0
*/
private[sql] def name(sinkName: String): this.type

/**
* Specifies the underlying output data source.
*
Expand Down Expand Up @@ -217,6 +230,24 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery

/**
* Validates that a streaming sink name only contains alphanumeric characters and underscores.
*
* @param sinkName
* the sink name to validate
* @throws AnalysisException
* if the sink name contains invalid characters
* @throws IllegalArgumentException
* if the sink name is null or empty
*/
private[sql] def validateSinkName(sinkName: String): Unit = {
Comment thread
ericm-db marked this conversation as resolved.
StreamingNameValidator.validate(sinkName, "Sink") { invalid =>
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
messageParameters = Map("sinkName" -> invalid))
}
}

///////////////////////////////////////////////////////////////////////////////////////
// Covariant Overrides
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming

import scala.util.matching.Regex

import org.apache.spark.sql.AnalysisException

/**
* Shared validation for user-assigned streaming source and sink names. Names must be non-null,
* non-empty, and contain only alphanumeric characters and underscores.
*/
private[sql] object StreamingNameValidator {
private val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r

/**
* Validates the given streaming entity name. Throws an `IllegalArgumentException` if the name
* is null or empty, and invokes `onInvalid` to build the `AnalysisException` to throw if the
* name does not match the allowed character set.
*
* @param name
* the source/sink name to validate
* @param entityKind
* a human-readable label (e.g. "Source", "Sink") used in null/empty messages
* @param onInvalid
* builds the AnalysisException to throw when `name` has invalid characters
*/
def validate(name: String, entityKind: String)(onInvalid: String => AnalysisException): Unit = {
require(name != null, s"$entityKind name cannot be null")
require(name.nonEmpty, s"$entityKind name cannot be empty")
if (!validNamePattern.pattern.matcher(name).matches()) {
throw onInvalid(name)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.streaming.OutputMode
*/
case class WriteToStream(
name: String,
sinkName: Option[String],
resolvedCheckpointLocation: String,
sink: Table,
outputMode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
* rule [[ResolveStreamWrite]].
*
* @param userSpecifiedName Query name optionally specified by the user.
* @param userSpecifiedSinkName Sink name optionally specified by the user for sink evolution.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
* @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user
* has not specified one. If false, then error will be thrown.
Expand All @@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
*/
case class WriteToStreamStatement(
userSpecifiedName: Option[String],
userSpecifiedSinkName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("sourceName" -> sourceName))
}

def invalidStreamingSinkNameError(sinkName: String): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
messageParameters = Map("sinkName" -> sinkName))
}

def duplicateStreamingSourceNamesError(duplicateNames: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3143,6 +3143,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ENABLE_STREAMING_SINK_EVOLUTION =
buildConf("spark.sql.streaming.queryEvolution.enableSinkEvolution")
.internal()
.doc("When true, streaming sinks can be named using the name() API on DataStreamWriter. " +
"This enables sink evolution capability where sinks can be changed while maintaining " +
"a historical record of all sinks used in the checkpoint.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)

val STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART =
buildConf("spark.sql.streaming.checkUnfinishedRepartitionOnRestart")
.internal()
Expand Down Expand Up @@ -7673,6 +7683,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def enableStreamingSourceEvolution: Boolean = getConf(ENABLE_STREAMING_SOURCE_EVOLUTION)

def enableStreamingSinkEvolution: Boolean = getConf(ENABLE_STREAMING_SINK_EVOLUTION)

def streamingCheckUnfinishedRepartitionOnRestart: Boolean =
getConf(STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T])
this
}

/** @inheritdoc */
private[sql] def name(sinkName: String): this.type = {
throw new UnsupportedOperationException("Sink naming is not supported in Spark Connect")
}

/** @inheritdoc */
def format(source: String): this.type = {
sinkBuilder.setFormat(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
this
}

/** @inheritdoc */
private[sql] def name(sinkName: String): this.type = {
validateSinkName(sinkName)
this.sinkName = Some(sinkName)
this
}

/** @inheritdoc */
def format(source: String): this.type = {
this.source = source
Expand Down Expand Up @@ -312,6 +319,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D

ds.sparkSession.sessionState.streamingQueryManager.startQuery(
newOptions.get("queryName"),
sinkName,
newOptions.get("checkpointLocation"),
ds,
newOptions.originalMap,
Expand Down Expand Up @@ -444,6 +452,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
private var partitioningColumns: Option[Seq[String]] = None

private var clusteringColumns: Option[Seq[String]] = None

private var sinkName: Option[String] = None
}

object DataStreamWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class StreamingQueryManager private[sql] (
// scalastyle:off argcount
private def createQuery(
userSpecifiedName: Option[String],
userSpecifiedSinkName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: Dataset[_],
extraOptions: Map[String, String],
Expand Down Expand Up @@ -207,6 +208,7 @@ class StreamingQueryManager private[sql] (

val dataStreamWritePlan = WriteToStreamStatement(
userSpecifiedName,
userSpecifiedSinkName,
userSpecifiedCheckpointLocation,
useTempCheckpointLocation,
recoverFromCheckpointLocation,
Expand Down Expand Up @@ -277,6 +279,7 @@ class StreamingQueryManager private[sql] (
@throws[TimeoutException]
private[sql] def startQuery(
userSpecifiedName: Option[String],
userSpecifiedSinkName: Option[String] = None,
userSpecifiedCheckpointLocation: Option[String],
df: Dataset[_],
extraOptions: Map[String, String],
Expand All @@ -290,6 +293,7 @@ class StreamingQueryManager private[sql] (
catalogTable: Option[CatalogTable] = None): StreamingQuery = {
val query = createQuery(
userSpecifiedName,
userSpecifiedSinkName,
userSpecifiedCheckpointLocation,
df,
extraOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException}
import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkIllegalStateException}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -112,6 +112,22 @@ class MicroBatchExecution(

override protected def sourceToIdMap: Map[SparkDataStream, String] = sourceIdMap.map(_.swap)

// Sink name for commit log support
// If sink evolution is enabled, use user-provided sinkName (or error if not provided)
// Otherwise, always use DEFAULT_SINK_NAME for backward compatibility
private val sinkName: String = {
if (sparkSession.sessionState.conf.enableStreamingSinkEvolution) {
plan.sinkName.getOrElse {
throw new SparkException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT",
messageParameters = Map.empty,
cause = null)
}
} else {
MicroBatchExecution.DEFAULT_SINK_NAME
}
}

@volatile protected[sql] var triggerExecutor: TriggerExecutor = _

protected def getTrigger(): TriggerExecutor = {
Expand Down Expand Up @@ -1466,6 +1482,12 @@ class MicroBatchExecution(

object MicroBatchExecution {
val BATCH_ID_KEY = "streaming.sql.batchId"

/**
* Default sink name used when sink evolution is disabled or no explicit name is provided.
* This maintains backward compatibility with existing streaming queries.
*/
private[sql] val DEFAULT_SINK_NAME = "sink-0"
}

case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends LeafNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {

WriteToStream(
s.userSpecifiedName.orNull,
s.userSpecifiedSinkName,
resolvedCheckpointLocation,
s.sink,
s.outputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ trait StreamTest extends SharedSparkSession with TimeLimits {
sparkSession
.streams
.startQuery(
None,
None,
Some(metadataRoot),
stream,
Expand Down
Loading