Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@
},
"INVALID_MULTIPLE_ARGUMENT_CONDITIONS": {
"message": [
"[{arg_names}] cannot be <condition>."
"[<arg_names>] cannot be <condition>."

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a real bug, formatting will break

]
},
"INVALID_NDARRAY_DIMENSION": {
Expand Down
20 changes: 12 additions & 8 deletions python/pyspark/pipelines/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@ def create_auto_cdc_flow(
) -> None:
"""
Create an Auto CDC flow into the target table from the Change Data Capture (CDC) source.
Target table must have already been created using create_streaming_table function. Only one
of column_list and except_column_list can be specified.
Target table must have already been created using the `create_streaming_table` function.
Only one of column_list and except_column_list can be specified.

Example:
create_auto_cdc_flow(
Expand Down Expand Up @@ -576,16 +576,19 @@ def create_auto_cdc_flow(
:param column_list: Columns that will be included in the output table. This should be a list \
of column identifiers without qualifiers, expressed as either Python strings or PySpark \
Columns. Only one of column_list and except_column_list can be specified.
:param except_column_list: Columns that will be excluded in the output table. This should be a \
list of column identifiers without qualifiers, expressed as either Python strings or \
:param except_column_list: Columns that will be excluded from the output table. This should \
be a list of column identifiers without qualifiers, expressed as either Python strings or \
PySpark Columns. Only one of column_list and except_column_list can be specified. When \
this is specified, all columns in the dataframe of the target table except those in this \
list will be in the output table.
this is specified, all columns in the `DataFrame` of the target table except those in \
this list will be in the output table.
:param stored_as_scd_type: The SCD type for the target table. Only 1 (or "1") is supported. \
When not specified the server default applies.
:param name: The name of the flow for this create_auto_cdc_flow command. When unspecified \
When not specified, the server default applies.
:param name: The name of the flow for this create_auto_cdc_flow command. When unspecified, \
this will build a "default flow" with name equal to the target name.
"""
# Lazy import: pyspark.sql.connect.functions.builtin transitively imports grpc, which is
# not available in the docs-build environment. pyspark.pipelines.api is loaded eagerly
# from pyspark.pipelines.__init__, so a top-level import here would break docs CI.
from pyspark.sql.connect.functions.builtin import expr as _connect_expr

if type(target) is not str:
Expand Down Expand Up @@ -690,6 +693,7 @@ def _normalize_column_list(
arg_name: str,
column_list: Union[List[str], List[Column]],
) -> List[Column]:
# Lazy import: see comment in create_auto_cdc_flow.
from pyspark.sql.connect.functions.builtin import col as _connect_col

if not isinstance(column_list, list):
Expand Down
5 changes: 3 additions & 2 deletions python/pyspark/pipelines/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ class AutoCdcFlow:
:param source: The name of the CDC source to stream from.
:param keys: Column(s) that uniquely identify a row in source and target data.
:param sequence_by: Expression used to order the source data.
:param apply_as_deletes: Optional delete condition for the merged operation.
:param apply_as_deletes: Optional delete condition for the merge operation.
:param column_list: Optional columns to include in the output table.
:param except_column_list: Optional columns to exclude from the output table.
:param stored_as_scd_type: Optional SCD type for the target table. Only 1 is supported.
:param stored_as_scd_type: Optional SCD type for the target table. Only 1 (or "1") is \
supported.
:param source_code_location: The location of the source code that created this flow.
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.pipelines.autocdc

import org.apache.spark.SparkException
import org.apache.spark.sql.{functions => F, AnalysisException}
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.QuotingUtils
Expand Down Expand Up @@ -130,9 +130,6 @@ case class Scd1BatchProcessor(
*/
private[autocdc] def extendMicrobatchRowsWithCdcMetadata(
validatedMicrobatch: DataFrame): DataFrame = {
// Proactively validate the reserved CDC metadata column does not exist in the microbatch.
validateCdcMetadataColumnNotPresent(validatedMicrobatch)

val rowDeleteSequence: Column = changeArgs.deleteCondition match {
case Some(deleteCondition) =>
F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null))
Expand Down Expand Up @@ -409,25 +406,6 @@ case class Scd1BatchProcessor(
.insert(columnsToInsertOnNewKey)
.merge()
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver

microbatch.schema.fieldNames
.find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
.foreach { conflictingColumnName =>
throw new AnalysisException(
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
"columnName" -> conflictingColumnName,
"schemaName" -> "microbatch",
"reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
)
)
}
}
Comment on lines -412 to -430

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this was supposed to be removed in the other PR but I think it got accidentally added back as I rebased the stack!

}

object Scd1BatchProcessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ sealed trait UnresolvedFlow extends Flow {
* An [[UnresolvedFlow]] whose execution-type has not yet been determined.
*
* In some cases, we know the execution-type for an [[UnresolvedFlow]] even before flow analysis
* and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a
* flow will be an AutoCDC flow immediately on construction, because it has its own special
* registration API. Such flows are considered "typed flows", but there isn't any semantic reason
* yet to explicitly introduce a `TypedFlow` trait/class.
* and resolution. For example, an [[AutoCdcFlow]] is a special unresolved-but-typed flow; we
* know a flow will be an AutoCDC flow immediately on construction, because it has its own
* special registration API. Such flows are considered "typed flows", but there isn't any
* semantic reason yet to explicitly introduce a `TypedFlow` trait/class.
*/
case class UntypedFlow(
identifier: TableIdentifier,
Expand All @@ -161,17 +161,16 @@ case class UntypedFlow(
* [[AutoCdcFlow]] is a typed flow because it is only supported for streaming, and not as a once
* flow. Therefore by definition it is a streaming-type flow.
*
* In the future once-support for [[AutoCdcFlow]] may be added.
* In the future, support for once-mode [[AutoCdcFlow]] may be added.
*/
case class AutoCdcFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
queryContext: QueryContext,
sqlConf: Map[String, String] = Map.empty,
comment: Option[String] = None,
override val origin: QueryOrigin,
changeArgs: ChangeArgs
changeArgs: ChangeArgs,
sqlConf: Map[String, String] = Map.empty
) extends UnresolvedFlow {
override val once: Boolean = false

Expand Down Expand Up @@ -245,8 +244,8 @@ class AppendOnceFlow(
}

/**
* A resolved flow that applies a CDC event stream to a target table via MERGE, in accordance to
* the configured [[flow.changeArgs]].
* A resolved flow that applies a CDC event stream to a target table via MERGE, in accordance
* with the configured [[flow.changeArgs]].
*/
class AutoCdcMergeFlow(
val flow: AutoCdcFlow,
Expand All @@ -264,8 +263,8 @@ class AutoCdcMergeFlow(
columnSelection = changeArgs.columnSelection,
caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
)
// AutoCDC flows require all key columns to be present in the target table, to adhere to SCD
// semantics.
// AutoCDC flows require all key columns to be present in the user-selected source schema,
// so that they survive into the target table where SCD reconciliation needs them.
requireKeysPresentInSelectedSchema(selectedSchema)
selectedSchema
}
Expand Down Expand Up @@ -305,11 +304,11 @@ class AutoCdcMergeFlow(
* Returns an empty dataframe whose schema matches [[AutoCdcMergeFlow.schema]]. By construction,
* the returned dataframe will be a streaming dataframe.
*
* In practice, [[AutoCdcMergeFlow.load]] is not invoked during graph analysis or execution.
* An AutoCdcMergeFlow can only be an input to a streaming table (not an MV or
* persisted/temp view), and streaming tables consume a [[VirtualTableInput]] rather than the
* producing [[Flow]] directly. [[VirtualTableInput]] overrides its own [[load]] to do schema
* inference on its input flows, rather than a transitive [[Flow.load]].
* Today, [[AutoCdcMergeFlow.load]] is not actually ever called during graph analysis or
* execution. An AutoCdcMergeFlow can only be an input to a streaming table (not an MV or
* persisted/temp view), and streaming tables take a [[VirtualTableInput]] as input, not
* the producing [[Flow]] directly. [[VirtualTableInput]] overrides its own [[load]] to do
* schema inference on its input flows, rather than a transitive [[ResolvedFlow.load]].
*
* The implementation exists for API consistency and throws an internal error if invoked with
* `asStreaming = false`, or if the underlying source dataframe is not streaming, to surface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ trait GraphValidations extends Logging {
protected[pipelines] def validateMultiQueryTables(): Map[TableIdentifier, Seq[Flow]] = {
val multiQueryTables = flowsTo.filter(_._2.size > 1)

// A multiflow table may not have an AutoCDC flow; AutoCDC flow targets must be single query.
// A multiflow table may not have an AutoCDC flow; AutoCDC targets must have exactly one
// input flow.
multiQueryTables
.find { case (_, flows) => flows.exists(isAutoCdcFlow) }
.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
func: FlowFunction = noOpFlowFunction,
queryContext: QueryContext = testQueryContext,
sqlConf: Map[String, String] = Map.empty,
comment: Option[String] = None,
origin: QueryOrigin = QueryOrigin.empty,
changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = {
AutoCdcFlow(
Expand All @@ -83,29 +82,26 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
func = func,
queryContext = queryContext,
sqlConf = sqlConf,
comment = comment,
origin = origin,
changeArgs = changeArgs
)
}

test("AutoCdcFlow exposes its constructor fields") {
val flow = newAutoCdcFlow(
sqlConf = Map("spark.sql.shuffle.partitions" -> "8"),
comment = Some("my CDC flow")
sqlConf = Map("spark.sql.shuffle.partitions" -> "8")
)

assert(flow.identifier == testIdentifier)
assert(flow.destinationIdentifier == testIdentifier)
assert(flow.func eq noOpFlowFunction)
assert(flow.queryContext == testQueryContext)
assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8"))
assert(flow.comment.contains("my CDC flow"))
assert(flow.origin == QueryOrigin.empty)
assert(flow.changeArgs == testChangeArgs)
}

test("AutoCdcFlow defaults sqlConf to empty and comment to None") {
test("AutoCdcFlow defaults sqlConf to empty") {
// Confirms the case-class default values match the documented contract; downstream
// registration code relies on `sqlConf` being a non-null empty map by default so that
// `defaultSqlConf ++ flowDef.sqlConf` is well-defined in [[GraphRegistrationContext]].
Expand All @@ -119,7 +115,6 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
)

assert(flow.sqlConf.isEmpty)
assert(flow.comment.isEmpty)
}

test("AutoCdcFlow.once is always false") {
Expand All @@ -143,7 +138,6 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
assert(updated.destinationIdentifier == original.destinationIdentifier)
assert(updated.func eq original.func)
assert(updated.queryContext == original.queryContext)
assert(updated.comment == original.comment)
assert(updated.origin == original.origin)
assert(updated.changeArgs == original.changeArgs)
// The original must not be mutated.
Expand All @@ -165,7 +159,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
sqlConf = Map.empty
)

/** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change args. */
/** Builds an [[AutoCdcMergeFlow]] over the given source dataframe + change args. */
private def newAutoCdcMergeFlow(
sourceDf: DataFrame,
keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")),
Expand Down Expand Up @@ -445,9 +439,9 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
"AutoCdcMergeFlow rejects a source df column whose name equals the reserved CDC " +
"metadata column"
) {
// Locks in the previous engine-level guard (Scd1BatchProcessor.extendMicrobatchRowsWith
// CdcMetadata) at flow-construction time. Any future regression where a user-supplied
// CDC stream carries the reserved metadata column name should fail eagerly here.
// Locks in the previous engine-level guard at flow-construction time. Any future
// regression where a user-supplied CDC stream carries the reserved metadata column name
// should fail eagerly here.
val sourceDf = sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType)

checkError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with SharedSparkSession {
test(
"AutoCDC flow targeting a temporary view fails with AUTOCDC_RELATION_FOR_TEMPORARY_VIEW"
) {
// Temporary views in SDP normally accept either streaming or batch producing flows, but
// Temporary views in SDP normally accept either streaming or batch-producing flows, but
// AutoCDC flows are an explicit exception: SCD reconciliation only runs at the
// streaming-table sink (`Scd1ForeachBatchHandler`), so pointing an AutoCDC flow at a view
// would silently drop reconciliation and expose just the projected CDF to consumers.
Expand Down