diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index a3cd9f8536eb5..1fbac3a1ad14d 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6855,6 +6855,11 @@ "Duplicate streaming source names detected: . Each streaming source must have a unique name." ] }, + "INVALID_SINK_NAME" : { + "message" : [ + "Invalid streaming sink name: ''. Sink names must only contain ASCII letters ('a'-'z', 'A'-'Z'), digits ('0'-'9'), and underscores ('_')." + ] + }, "INVALID_SOURCE_NAME" : { "message" : [ "Invalid streaming source name ''. Source names must only contain ASCII letters (a-z, A-Z), digits (0-9), and underscores (_)." @@ -6865,6 +6870,11 @@ "Streaming source naming is not supported. Source name '' was provided but the feature is disabled. Please enable the feature by setting spark.sql.streaming.queryEvolution.enableSourceEvolution to true." ] }, + "UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT" : { + "message" : [ + "Streaming sink must be named when spark.sql.streaming.queryEvolution.enableSinkEvolution is enabled. Use the name() method on DataStreamWriter to assign a name to the streaming sink." + ] + }, "UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT" : { "message" : [ "All streaming sources must be named when spark.sql.streaming.queryEvolution.enableSourceEvolution is enabled. Unnamed sources found: . Use the name() method to assign names to all streaming sources." diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b5434efee090c..376612cbe2617 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -88,7 +88,10 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList"), // [SPARK-54323][PYTHON] Change the way to access logs to TVF instead of system view - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs"), + + // [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamWriter.name") ) // Default exclude rules diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index b90a7c910f9c8..726b10d6416ab 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.streaming import scala.jdk.CollectionConverters._ -import scala.util.matching.Regex import org.apache.spark.annotation.Evolving import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoders} @@ -356,18 +355,16 @@ abstract class DataStreamReader { * * @param sourceName * the source name to validate + * @throws AnalysisException + * if the source name contains invalid characters * @throws IllegalArgumentException - * if the source name is null, empty, or contains invalid characters + * if the source name is null or empty */ private[sql] def validateSourceName(sourceName: String): Unit = { - require(sourceName != null, "Source name cannot be null") - require(sourceName.nonEmpty, "Source name cannot be empty") - - val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r - if (!validNamePattern.pattern.matcher(sourceName).matches()) { - throw new AnalysisException( + StreamingNameValidator.validate(sourceName, "Source") { invalid => + new AnalysisException( errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", - messageParameters = Map("sourceName" -> sourceName)) + messageParameters = Map("sourceName" -> invalid)) } } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index cb5ecc728c441..8f98466d1f17e 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -20,7 +20,7 @@ import java.util.concurrent.TimeoutException import org.apache.spark.annotation.Evolving import org.apache.spark.api.java.function.VoidFunction2 -import org.apache.spark.sql.{Dataset, ForeachWriter, WriteConfigMethods} +import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter, WriteConfigMethods} /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, @@ -90,6 +90,19 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T */ def queryName(queryName: String): this.type + /** + * Assigns a name to this streaming sink for sink evolution capability. When sinks are named, + * they can be tracked in checkpoint metadata, enabling query evolution. + * + * If not specified, sinks are automatically assigned a default name based on their position in + * the query, which maintains backward compatibility. + * + * @param sinkName + * the unique name for this sink (alphanumeric and underscore only) + * @since 4.1.0 + */ + private[sql] def name(sinkName: String): this.type + /** * Specifies the underlying output data source. * @@ -217,6 +230,24 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T @throws[TimeoutException] def toTable(tableName: String): StreamingQuery + /** + * Validates that a streaming sink name only contains alphanumeric characters and underscores. + * + * @param sinkName + * the sink name to validate + * @throws AnalysisException + * if the sink name contains invalid characters + * @throws IllegalArgumentException + * if the sink name is null or empty + */ + private[sql] def validateSinkName(sinkName: String): Unit = { + StreamingNameValidator.validate(sinkName, "Sink") { invalid => + new AnalysisException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + messageParameters = Map("sinkName" -> invalid)) + } + } + /////////////////////////////////////////////////////////////////////////////////////// // Covariant Overrides /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala new file mode 100644 index 0000000000000..9b8700844310c --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingNameValidator.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.util.matching.Regex + +import org.apache.spark.sql.AnalysisException + +/** + * Shared validation for user-assigned streaming source and sink names. Names must be non-null, + * non-empty, and contain only alphanumeric characters and underscores. + */ +private[sql] object StreamingNameValidator { + private val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r + + /** + * Validates the given streaming entity name. Throws an `IllegalArgumentException` if the name + * is null or empty, and invokes `onInvalid` to build the `AnalysisException` to throw if the + * name does not match the allowed character set. + * + * @param name + * the source/sink name to validate + * @param entityKind + * a human-readable label (e.g. "Source", "Sink") used in null/empty messages + * @param onInvalid + * builds the AnalysisException to throw when `name` has invalid characters + */ + def validate(name: String, entityKind: String)(onInvalid: String => AnalysisException): Unit = { + require(name != null, s"$entityKind name cannot be null") + require(name.nonEmpty, s"$entityKind name cannot be empty") + if (!validNamePattern.pattern.matcher(name).matches()) { + throw onInvalid(name) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala index 884a4165d077e..6e0583f778350 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.streaming.OutputMode */ case class WriteToStream( name: String, + sinkName: Option[String], resolvedCheckpointLocation: String, sink: Table, outputMode: OutputMode, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala index 7015d0dd3b2cc..61e64a526aeda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger} * rule [[ResolveStreamWrite]]. * * @param userSpecifiedName Query name optionally specified by the user. + * @param userSpecifiedSinkName Sink name optionally specified by the user for sink evolution. * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user * has not specified one. If false, then error will be thrown. @@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger} */ case class WriteToStreamStatement( userSpecifiedName: Option[String], + userSpecifiedSinkName: Option[String], userSpecifiedCheckpointLocation: Option[String], useTempCheckpointLocation: Boolean, recoverFromCheckpointLocation: Boolean, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 6f3348d01bbba..1716c6eebebf2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2545,6 +2545,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("sourceName" -> sourceName)) } + def invalidStreamingSinkNameError(sinkName: String): Throwable = { + new AnalysisException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + messageParameters = Map("sinkName" -> sinkName)) + } + def duplicateStreamingSourceNamesError(duplicateNames: Seq[String]): Throwable = { new AnalysisException( errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 83f1816c9727f..fbf9a04ddc14d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3143,6 +3143,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ENABLE_STREAMING_SINK_EVOLUTION = + buildConf("spark.sql.streaming.queryEvolution.enableSinkEvolution") + .internal() + .doc("When true, streaming sinks can be named using the name() API on DataStreamWriter. " + + "This enables sink evolution capability where sinks can be changed while maintaining " + + "a historical record of all sinks used in the checkpoint.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART = buildConf("spark.sql.streaming.checkUnfinishedRepartitionOnRestart") .internal() @@ -7673,6 +7683,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def enableStreamingSourceEvolution: Boolean = getConf(ENABLE_STREAMING_SOURCE_EVOLUTION) + def enableStreamingSinkEvolution: Boolean = getConf(ENABLE_STREAMING_SINK_EVOLUTION) + def streamingCheckUnfinishedRepartitionOnRestart: Boolean = getConf(STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala index ffa11b5d7ab0d..bac41acc83f03 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/DataStreamWriter.scala @@ -82,6 +82,11 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) this } + /** @inheritdoc */ + private[sql] def name(sinkName: String): this.type = { + throw new UnsupportedOperationException("Sink naming is not supported in Spark Connect") + } + /** @inheritdoc */ def format(source: String): this.type = { sinkBuilder.setFormat(source) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 38483395ec8c5..e2b5961411a0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -83,6 +83,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D this } + /** @inheritdoc */ + private[sql] def name(sinkName: String): this.type = { + validateSinkName(sinkName) + this.sinkName = Some(sinkName) + this + } + /** @inheritdoc */ def format(source: String): this.type = { this.source = source @@ -312,6 +319,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D ds.sparkSession.sessionState.streamingQueryManager.startQuery( newOptions.get("queryName"), + sinkName, newOptions.get("checkpointLocation"), ds, newOptions.originalMap, @@ -444,6 +452,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D private var partitioningColumns: Option[Seq[String]] = None private var clusteringColumns: Option[Seq[String]] = None + + private var sinkName: Option[String] = None } object DataStreamWriter { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala index 72ae3b21d662a..fff8d32a0709b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala @@ -176,6 +176,7 @@ class StreamingQueryManager private[sql] ( // scalastyle:off argcount private def createQuery( userSpecifiedName: Option[String], + userSpecifiedSinkName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: Dataset[_], extraOptions: Map[String, String], @@ -207,6 +208,7 @@ class StreamingQueryManager private[sql] ( val dataStreamWritePlan = WriteToStreamStatement( userSpecifiedName, + userSpecifiedSinkName, userSpecifiedCheckpointLocation, useTempCheckpointLocation, recoverFromCheckpointLocation, @@ -277,6 +279,7 @@ class StreamingQueryManager private[sql] ( @throws[TimeoutException] private[sql] def startQuery( userSpecifiedName: Option[String], + userSpecifiedSinkName: Option[String] = None, userSpecifiedCheckpointLocation: Option[String], df: Dataset[_], extraOptions: Map[String, String], @@ -290,6 +293,7 @@ class StreamingQueryManager private[sql] ( catalogTable: Option[CatalogTable] = None): StreamingQuery = { val query = createQuery( userSpecifiedName, + userSpecifiedSinkName, userSpecifiedCheckpointLocation, df, extraOptions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index e6d0666aca259..d4fae034e7974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -26,7 +26,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkIllegalArgumentException, SparkIllegalStateException} +import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkIllegalStateException} import org.apache.spark.internal.LogKeys import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -112,6 +112,22 @@ class MicroBatchExecution( override protected def sourceToIdMap: Map[SparkDataStream, String] = sourceIdMap.map(_.swap) + // Sink name for commit log support + // If sink evolution is enabled, use user-provided sinkName (or error if not provided) + // Otherwise, always use DEFAULT_SINK_NAME for backward compatibility + private val sinkName: String = { + if (sparkSession.sessionState.conf.enableStreamingSinkEvolution) { + plan.sinkName.getOrElse { + throw new SparkException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT", + messageParameters = Map.empty, + cause = null) + } + } else { + MicroBatchExecution.DEFAULT_SINK_NAME + } + } + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ protected def getTrigger(): TriggerExecutor = { @@ -1466,6 +1482,12 @@ class MicroBatchExecution( object MicroBatchExecution { val BATCH_ID_KEY = "streaming.sql.batchId" + + /** + * Default sink name used when sink evolution is disabled or no explicit name is provided. + * This maintains backward compatibility with existing streaming queries. + */ + private[sql] val DEFAULT_SINK_NAME = "sink-0" } case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends LeafNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala index ff0d71d0f0759..0be430591dbd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala @@ -66,6 +66,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] { WriteToStream( s.userSpecifiedName.orNull, + s.userSpecifiedSinkName, resolvedCheckpointLocation, s.sink, s.outputMode, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 0e33b271522dc..a6067aaf189e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -598,6 +598,7 @@ trait StreamTest extends SharedSparkSession with TimeLimits { sparkSession .streams .startQuery( + None, None, Some(metadataRoot), stream, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala new file mode 100644 index 0000000000000..a242faabaf921 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.test + +import org.scalatest.{BeforeAndAfterEach, Tag} + +import org.apache.spark.SparkException +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.util.Utils + +/** + * Test suite for streaming sink evolution features including: + * - Sink naming via DataStreamWriter.name() + * - Sink name validation + * - Sink evolution enforcement + */ +class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach { + import testImplicits._ + + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + override def afterEach(): Unit = { + spark.streams.active.foreach(_.stop()) + super.afterEach() + } + + // ========================= + // Sink Name Validation Tests + // ========================= + + testWithSinkEvolution("invalid sink name - contains hyphen") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + checkError( + exception = intercept[AnalysisException] { + input.toDF().writeStream + .format("noop") + .name("my-sink") + .option("checkpointLocation", newMetadataDir) + .start() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + parameters = Map("sinkName" -> "my-sink")) + } + + testWithSinkEvolution("invalid sink name - contains space") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + checkError( + exception = intercept[AnalysisException] { + input.toDF().writeStream + .format("noop") + .name("my sink") + .option("checkpointLocation", newMetadataDir) + .start() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + parameters = Map("sinkName" -> "my sink")) + } + + testWithSinkEvolution("invalid sink name - contains special characters") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + checkError( + exception = intercept[AnalysisException] { + input.toDF().writeStream + .format("noop") + .name("my.sink@123!") + .option("checkpointLocation", newMetadataDir) + .start() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME", + parameters = Map("sinkName" -> "my.sink@123!")) + } + + testWithSinkEvolution("valid sink names - various patterns") { + Seq("mySink", "my_sink", "MySink123", "_private", "sink_123_test", "123sink") + .foreach { sinkName => + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val q = input.toDF().writeStream + .format("noop") + .name(sinkName) + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + } + + // =========================== + // Sink Evolution Enforcement + // =========================== + + testWithSinkEvolution("unnamed sink with sink evolution enabled throws error") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val exception = intercept[SparkException] { + val q = input.toDF().writeStream + .format("noop") + // No .name() call - sink is unnamed + .option("checkpointLocation", newMetadataDir) + .start() + q.processAllAvailable() + q.stop() + } + + checkError( + exception = exception, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT", + parameters = Map.empty) + } + + test("unnamed sink without sink evolution enabled uses default name") { + withSQLConf( + SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + // Should succeed - no name required when sink evolution is disabled + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", newMetadataDir) + .start() + q.processAllAvailable() + q.stop() + } + } + + testWithSinkEvolution("named sink succeeds with sink evolution enabled") { + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val q = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", newMetadataDir) + .start() + q.processAllAvailable() + q.stop() + } + + testWithSinkEvolution("continuing with same sink name works") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // Start with my_sink + input.addData(1, 2, 3) + val q1 = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", checkpointDir) + .start() + q1.processAllAvailable() + q1.stop() + + // Restart with same sink name - should work + input.addData(4, 5, 6) + val q2 = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", checkpointDir) + .start() + q2.processAllAvailable() + q2.stop() + } + + // ============== + // Helper Methods + // ============== + + /** + * Helper method to run tests with sink evolution enabled. + */ + def testWithSinkEvolution(testName: String, testTags: Tag*)(testBody: => Any): Unit = { + test(testName, testTags: _*) { + withSQLConf( + SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "true") { + testBody + } + } + } +} diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index 2aa6cb885ca31..b295c08879ef9 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -1035,6 +1035,7 @@ spark.sql.streaming.numRecentProgressUpdates spark.sql.streaming.offsetLog.formatVersion spark.sql.streaming.optimizeOneRowPlan.enabled spark.sql.streaming.pollingDelay +spark.sql.streaming.queryEvolution.enableSinkEvolution spark.sql.streaming.queryEvolution.enableSourceEvolution spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint spark.sql.streaming.realTimeMode.allowlistCheck