From 94afb60d762f6ac10a26c8b8a5cc41e2d8b21e72 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Wed, 15 Apr 2026 12:09:58 -0700 Subject: [PATCH 01/12] [SPARK-56489][SQL] Add CURRENT_PATH() builtin expression and PATH-related grammar keywords Add the CURRENT_PATH() builtin function that returns the current SQL resolution search path as a comma-separated string of qualified schema names (e.g. "system.builtin,system.session,spark_catalog.default"). Also register the grammar keywords needed by the upcoming SQL PATH feature (CURRENT_PATH, CURRENT_SCHEMA, CURRENT_DATABASE, DEFAULT_PATH, SYSTEM_PATH, PATH). CURRENT_PATH and CURRENT_SCHEMA are reserved in ANSI mode per SQL:2023; the others are non-reserved. In non-ANSI mode, CURRENT_PATH, CURRENT_DATABASE, and CURRENT_SCHEMA always resolve to their respective expressions (not UnresolvedAttribute), matching the behavior of CURRENT_CATALOG. This is part 1 of the SQL PATH feature (SPARK-54810), split out to keep the review scope manageable. --- docs/sql-ref-ansi-compliance.md | 6 +++++ .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 6 +++++ .../sql/catalyst/parser/SqlBaseParser.g4 | 12 +++++++++- .../catalyst/analysis/FunctionRegistry.scala | 1 + .../spark/sql/catalyst/expressions/misc.scala | 22 +++++++++++++++++++ .../catalyst/optimizer/finishAnalysis.scala | 10 ++++++++- .../sql/catalyst/parser/AstBuilder.scala | 18 +++++++++++---- .../SparkConnectDatabaseMetaDataSuite.scala | 3 ++- .../sql-functions/sql-expression-schema.md | 1 + .../results/keywords-enforced.sql.out | 8 +++++++ .../sql-tests/results/keywords.sql.out | 6 +++++ .../results/nonansi/keywords.sql.out | 6 +++++ .../sql/FunctionQualificationSuite.scala | 6 +++++ .../ThriftServerWithSparkContextSuite.scala | 2 +- 14 files changed, 99 insertions(+), 8 deletions(-) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 504267c79fa62..f96c222ecd61a 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -479,7 +479,10 @@ Below is a list of all the keywords in Spark SQL. |CROSS|reserved|strict-non-reserved|reserved| |CUBE|non-reserved|non-reserved|reserved| |CURRENT|non-reserved|non-reserved|reserved| +|CURRENT_DATABASE|non-reserved|non-reserved|non-reserved| |CURRENT_DATE|reserved|non-reserved|reserved| +|CURRENT_PATH|reserved|non-reserved|reserved| +|CURRENT_SCHEMA|reserved|non-reserved|reserved| |CURRENT_TIME|reserved|non-reserved|reserved| |CURRENT_TIMESTAMP|reserved|non-reserved|reserved| |CURRENT_USER|reserved|non-reserved|reserved| @@ -502,6 +505,7 @@ Below is a list of all the keywords in Spark SQL. |DEFAULT|non-reserved|non-reserved|non-reserved| |DEFINED|non-reserved|non-reserved|non-reserved| |DEFINER|non-reserved|non-reserved|non-reserved| +|DEFAULT_PATH|non-reserved|non-reserved|not a keyword| |DELAY|non-reserved|non-reserved|non-reserved| |DELETE|non-reserved|non-reserved|reserved| |DELIMITED|non-reserved|non-reserved|non-reserved| @@ -671,6 +675,7 @@ Below is a list of all the keywords in Spark SQL. |PARTITION|non-reserved|non-reserved|reserved| |PARTITIONED|non-reserved|non-reserved|non-reserved| |PARTITIONS|non-reserved|non-reserved|non-reserved| +|PATH|non-reserved|non-reserved|not a keyword| |PERCENT|non-reserved|non-reserved|non-reserved| |PIVOT|non-reserved|non-reserved|non-reserved| |PLACING|non-reserved|non-reserved|non-reserved| @@ -754,6 +759,7 @@ Below is a list of all the keywords in Spark SQL. |SUBSTR|non-reserved|non-reserved|non-reserved| |SUBSTRING|non-reserved|non-reserved|non-reserved| |SYNC|non-reserved|non-reserved|non-reserved| +|SYSTEM_PATH|non-reserved|non-reserved|not a keyword| |SYSTEM_TIME|non-reserved|non-reserved|non-reserved| |SYSTEM_VERSION|non-reserved|non-reserved|non-reserved| |TABLE|reserved|non-reserved|reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index b5e3531dba764..7af34270693d2 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -198,7 +198,10 @@ CREATE: 'CREATE'; CROSS: 'CROSS'; CUBE: 'CUBE'; CURRENT: 'CURRENT'; +CURRENT_DATABASE: 'CURRENT_DATABASE'; CURRENT_DATE: 'CURRENT_DATE'; +CURRENT_PATH: 'CURRENT_PATH'; +CURRENT_SCHEMA: 'CURRENT_SCHEMA'; CURRENT_TIME: 'CURRENT_TIME'; CURRENT_TIMESTAMP: 'CURRENT_TIMESTAMP'; CURRENT_USER: 'CURRENT_USER'; @@ -219,6 +222,7 @@ DEC: 'DEC'; DECIMAL: 'DECIMAL'; DECLARE: 'DECLARE'; DEFAULT: 'DEFAULT'; +DEFAULT_PATH: 'DEFAULT_PATH'; DEFINED: 'DEFINED'; DEFINER: 'DEFINER'; DELAY: 'DELAY'; @@ -389,6 +393,7 @@ OVERWRITE: 'OVERWRITE'; PARTITION: 'PARTITION'; PARTITIONED: 'PARTITIONED'; PARTITIONS: 'PARTITIONS'; +PATH: 'PATH'; PERCENTLIT: 'PERCENT'; PIVOT: 'PIVOT'; PLACING: 'PLACING'; @@ -474,6 +479,7 @@ SUBSTRING: 'SUBSTRING'; SYNC: 'SYNC'; SYSTEM_TIME: 'SYSTEM_TIME'; SYSTEM_VERSION: 'SYSTEM_VERSION'; +SYSTEM_PATH: 'SYSTEM_PATH'; TABLE: 'TABLE'; TABLES: 'TABLES'; TABLESAMPLE: 'TABLESAMPLE'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index b669b5201cdc1..3d1fbc2fd6688 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1297,7 +1297,7 @@ datetimeUnit ; primaryExpression - : name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER | CURRENT_TIME) #currentLike + : name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER | CURRENT_TIME | CURRENT_PATH | CURRENT_DATABASE | CURRENT_SCHEMA) #currentLike | name=(TIMESTAMPADD | DATEADD | DATE_ADD) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA unitsAmount=valueExpression COMMA timestamp=valueExpression RIGHT_PAREN #timestampadd | name=(TIMESTAMPDIFF | DATEDIFF | DATE_DIFF | TIMEDIFF) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA startTimestamp=valueExpression COMMA endTimestamp=valueExpression RIGHT_PAREN #timestampdiff | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase @@ -1961,6 +1961,7 @@ ansiNonReserved | CURSOR | CUBE | CURRENT + | CURRENT_DATABASE | DATA | DATABASE | DATABASES @@ -1977,6 +1978,7 @@ ansiNonReserved | DECIMAL | DECLARE | DEFAULT + | DEFAULT_PATH | DEFINED | DEFINER | DELAY @@ -2112,6 +2114,7 @@ ansiNonReserved | PARTITION | PARTITIONED | PARTITIONS + | PATH | PERCENTLIT | PIVOT | PLACING @@ -2187,6 +2190,7 @@ ansiNonReserved | SUBSTR | SUBSTRING | SYNC + | SYSTEM_PATH | SYSTEM_TIME | SYSTEM_VERSION | TABLES @@ -2342,7 +2346,10 @@ nonReserved | CUBE | CURRENT | CURSOR + | CURRENT_DATABASE | CURRENT_DATE + | CURRENT_PATH + | CURRENT_SCHEMA | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_USER @@ -2362,6 +2369,7 @@ nonReserved | DECIMAL | DECLARE | DEFAULT + | DEFAULT_PATH | DEFINED | DEFINER | DELAY @@ -2524,6 +2532,7 @@ nonReserved | PARTITION | PARTITIONED | PARTITIONS + | PATH | PERCENTLIT | PIVOT | PLACING @@ -2604,6 +2613,7 @@ nonReserved | SUBSTR | SUBSTRING | SYNC + | SYSTEM_PATH | SYSTEM_TIME | SYSTEM_VERSION | TABLE diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index cadb7750c2460..ef7b25208928c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -854,6 +854,7 @@ object FunctionRegistry { expression[CurrentDatabase]("current_database"), expression[CurrentDatabase]("current_schema", true, Some("3.4.0")), expression[CurrentCatalog]("current_catalog"), + expression[CurrentPath]("current_path", true, Some("4.2.0")), expression[CurrentUser]("current_user"), expression[CurrentUser]("user", true, Some("3.4.0")), expression[CurrentUser]("session_user", true, Some("4.0.0")), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 3507627a58571..571764c3517d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -249,6 +249,28 @@ case class CurrentCatalog() final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE) } +/** + * Returns the current SQL path as a comma-separated list of qualified schema names + * (catalog.schema). Responsive to USE SCHEMA / USE CATALOG when PATH feature is enabled. + */ +@ExpressionDescription( + usage = "_FUNC_() - Returns the current SQL path (qualified schema names).", + examples = """ + Examples: + > SELECT _FUNC_(); + system.builtin,system.session,spark_catalog.default + """, + since = "4.2.0", + group = "misc_funcs") +case class CurrentPath() + extends LeafExpression + with DefaultStringProducingExpression + with Unevaluable { + override def nullable: Boolean = false + override def prettyName: String = "current_path" + final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE) +} + // scalastyle:off line.size.limit @ExpressionDescription( usage = """_FUNC_() - Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string.""", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index c9c26d473b982..0fd7d1f02fa37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, con import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.{instantToNanosOfDay, truncateTimeToPrecision} import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -148,21 +149,28 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { } /** - * Replaces the expression of CurrentDatabase, CurrentCatalog, and CurrentUser + * Replaces the expression of CurrentDatabase, CurrentCatalog, CurrentPath, and CurrentUser * with the current values. */ case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ lazy val currentNamespace = catalogManager.currentNamespace.quoted + lazy val currentNamespaceSeq = catalogManager.currentNamespace.toSeq lazy val currentCatalog = catalogManager.currentCatalog.name() lazy val currentUser = CurrentUserContext.getCurrentUser + lazy val currentPathStr = { + val catalogPath = (currentCatalog +: currentNamespaceSeq).toSeq + SQLConf.get.resolutionSearchPath(catalogPath).map(_.mkString(".")).mkString(",") + } plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { case CurrentDatabase() => Literal.create(currentNamespace, StringType) case CurrentCatalog() => Literal.create(currentCatalog, StringType) + case CurrentPath() => + Literal.create(currentPathStr, StringType) case CurrentUser() => Literal.create(currentUser, StringType) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 753ed76fe16b5..bda3e6e75a8dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3434,14 +3434,24 @@ class AstBuilder extends DataTypeAstBuilder CurrentTimestamp() case SqlBaseParser.CURRENT_TIME => CurrentTime() + case SqlBaseParser.CURRENT_PATH => + CurrentPath() + case SqlBaseParser.CURRENT_DATABASE | SqlBaseParser.CURRENT_SCHEMA => + CurrentDatabase() case SqlBaseParser.CURRENT_USER | SqlBaseParser.USER | SqlBaseParser.SESSION_USER => CurrentUser() } } else { - // If the parser is not in ansi mode, we should return `UnresolvedAttribute`, in case there - // are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP` or `CURRENT_TIME`. - // ctx.name is a token, not an identifier context. - UnresolvedAttribute.quoted(ctx.name.getText) + ctx.name.getType match { + case SqlBaseParser.CURRENT_PATH => + CurrentPath() + case SqlBaseParser.CURRENT_DATABASE | SqlBaseParser.CURRENT_SCHEMA => + CurrentDatabase() + case _ => + // If the parser is not in ansi mode, we should return `UnresolvedAttribute`, in case + // there are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP` or `CURRENT_TIME`. + UnresolvedAttribute.quoted(ctx.name.getText) + } } } diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala index 7a6026977bac1..99c82345561d2 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala @@ -209,7 +209,8 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark withConnection { conn => val metadata = conn.getMetaData // scalastyle:off line.size.limit - assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,X,YEARS,ZONE") + // CURRENT_PATH is excluded: getSQLKeywords drops SQL:2003 reserved words (see companion). + assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,CURRENT_DATABASE,CURRENT_SCHEMA,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,X,YEARS,ZONE") // scalastyle:on line.size.limit } } diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index f2c72fa18ed6d..14f36cbae055b 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -107,6 +107,7 @@ | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_database | SELECT current_database() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_schema | SELECT current_schema() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentDate | current_date | SELECT current_date() | struct | +| org.apache.spark.sql.catalyst.expressions.CurrentPath | current_path | SELECT current_path() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentTime | current_time | SELECT current_time() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentTimeZone | current_timezone | SELECT current_timezone() | struct | | org.apache.spark.sql.catalyst.expressions.CurrentTimestamp | current_timestamp | SELECT current_timestamp() | struct | diff --git a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out index 6a70ea9bb1927..c941df66b64ba 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out @@ -74,7 +74,10 @@ CREATE true CROSS true CUBE false CURRENT false +CURRENT_DATABASE false CURRENT_DATE true +CURRENT_PATH true +CURRENT_SCHEMA true CURRENT_TIME true CURRENT_TIMESTAMP true CURRENT_USER true @@ -95,6 +98,7 @@ DEC false DECIMAL false DECLARE false DEFAULT false +DEFAULT_PATH false DEFINED false DEFINER false DELAY false @@ -266,6 +270,7 @@ OVERWRITE false PARTITION false PARTITIONED false PARTITIONS false +PATH false PERCENT false PIVOT false PLACING false @@ -347,6 +352,7 @@ STRUCT false SUBSTR false SUBSTRING false SYNC false +SYSTEM_PATH false SYSTEM_TIME false SYSTEM_VERSION false TABLE true @@ -438,6 +444,8 @@ CONSTRAINT CREATE CROSS CURRENT_DATE +CURRENT_PATH +CURRENT_SCHEMA CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index 59ed400f0b62a..ae13b363d28ec 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -74,7 +74,10 @@ CREATE false CROSS false CUBE false CURRENT false +CURRENT_DATABASE false CURRENT_DATE false +CURRENT_PATH false +CURRENT_SCHEMA false CURRENT_TIME false CURRENT_TIMESTAMP false CURRENT_USER false @@ -95,6 +98,7 @@ DEC false DECIMAL false DECLARE false DEFAULT false +DEFAULT_PATH false DEFINED false DEFINER false DELAY false @@ -266,6 +270,7 @@ OVERWRITE false PARTITION false PARTITIONED false PARTITIONS false +PATH false PERCENT false PIVOT false PLACING false @@ -347,6 +352,7 @@ STRUCT false SUBSTR false SUBSTRING false SYNC false +SYSTEM_PATH false SYSTEM_TIME false SYSTEM_VERSION false TABLE false diff --git a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out index 59ed400f0b62a..ae13b363d28ec 100644 --- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out @@ -74,7 +74,10 @@ CREATE false CROSS false CUBE false CURRENT false +CURRENT_DATABASE false CURRENT_DATE false +CURRENT_PATH false +CURRENT_SCHEMA false CURRENT_TIME false CURRENT_TIMESTAMP false CURRENT_USER false @@ -95,6 +98,7 @@ DEC false DECIMAL false DECLARE false DEFAULT false +DEFAULT_PATH false DEFINED false DEFINER false DELAY false @@ -266,6 +270,7 @@ OVERWRITE false PARTITION false PARTITIONED false PARTITIONS false +PATH false PERCENT false PIVOT false PLACING false @@ -347,6 +352,7 @@ STRUCT false SUBSTR false SUBSTRING false SYNC false +SYSTEM_PATH false SYSTEM_TIME false SYSTEM_VERSION false TABLE false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala index ccd67e8bb850c..f2c12404d18dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FunctionQualificationSuite.scala @@ -813,6 +813,12 @@ class FunctionQualificationSuite extends QueryTest with SharedSparkSession { sql("DROP TEMPORARY FUNCTION current_user") } + test("SECTION 11e: current_path() is a builtin and returns path string") { + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.nonEmpty && pathStr.contains("."), + s"current_path() should return a non-empty path with qualified schemas, got: $pathStr") + } + test("SECTION 12d: Extension - SHOW FUNCTIONS includes extension functions") { val functions = sql("SHOW FUNCTIONS").collect().map(_.getString(0)) assert(functions.contains("test_ext_func"), diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 3a64b52993306..a9446750a53a6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") + assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATABASE,CURRENT_DATE,CURRENT_PATH,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") // scalastyle:on line.size.limit } } From 8a5eca83b35c7fa25577defd8053c41b166320f2 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Wed, 15 Apr 2026 19:09:26 -0700 Subject: [PATCH 02/12] [SPARK-56501][SQL] Add SET PATH command and SQL PATH session configuration Add the SET PATH command and the SQLConf infrastructure for the SQL standard PATH feature: - spark.sql.path.enabled: master switch for the PATH feature - spark.sql.session.path: internal session path (set only via SET PATH) - SET PATH = schema1, schema2, ...: set the session path with support for shortcuts DEFAULT_PATH, SYSTEM_PATH, CURRENT_SCHEMA, PATH (append) - CURRENT_PATH() now reflects the stored session path when PATH is enabled - Reject direct SET of spark.sql.session.path (must use SET PATH) - Duplicate path entry detection with DUPLICATE_SQL_PATH_ENTRY error - Three-part schema references rejected with INVALID_SQL_PATH_SCHEMA_REFERENCE - SET PATH when disabled raises UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED Resolution is not yet wired to use the stored path -- tables and functions still resolve via the legacy single-schema path. The resolution engine rewiring comes in a follow-up PR. This is part 2a of the SQL PATH feature (SPARK-54810). --- .../resources/error/error-conditions.json | 22 ++ .../main/resources/error/error-states.json | 2 +- .../sql/catalyst/parser/SqlBaseParser.g4 | 10 + .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/finishAnalysis.scala | 9 +- .../sql/errors/QueryCompilationErrors.scala | 13 +- .../apache/spark/sql/internal/SQLConf.scala | 117 ++++++++- .../spark/sql/execution/SparkSqlParser.scala | 17 +- .../sql/execution/command/SetCommand.scala | 5 + .../execution/command/SetPathCommand.scala | 144 +++++++++++ .../apache/spark/sql/ParametersSuite.scala | 19 ++ .../org/apache/spark/sql/SetPathSuite.scala | 228 ++++++++++++++++++ 12 files changed, 575 insertions(+), 13 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 8c032d31cff61..4b0cd788d0dbf 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2061,6 +2061,12 @@ ], "sqlState" : "42711" }, + "DUPLICATE_SQL_PATH_ENTRY" : { + "message" : [ + "Duplicate SQL path entry . The session SQL path cannot contain the same catalog.schema more than once (including after expanding shortcuts like DEFAULT_PATH or SYSTEM_PATH)." + ], + "sqlState" : "42732" + }, "DUPLICATE_VARIABLE_NAME_INSIDE_DECLARE" : { "message" : [ "Found duplicate variable in the declare variable list. Please, remove one of them." @@ -4475,6 +4481,12 @@ ], "sqlState" : "XXKD0" }, + "INVALID_SQL_PATH_SCHEMA_REFERENCE" : { + "message" : [ + "Invalid qualified name in SET PATH: . Use at most two name parts (catalog.schema)." + ], + "sqlState" : "42601" + }, "INVALID_SQL_SYNTAX" : { "message" : [ "Invalid SQL syntax:" @@ -7884,6 +7896,16 @@ "Cannot have VARIANT type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column is ." ] }, + "SET_PATH_VIA_SET" : { + "message" : [ + "The session path cannot be set using the SET statement. Use SET PATH = ... instead." + ] + }, + "SET_PATH_WHEN_DISABLED" : { + "message" : [ + "SET PATH is disabled. Set to true to enable it." + ] + }, "SET_PROPERTIES_AND_DBPROPERTIES" : { "message" : [ "set PROPERTIES and DBPROPERTIES at the same time." diff --git a/common/utils/src/main/resources/error/error-states.json b/common/utils/src/main/resources/error/error-states.json index c2b2bb2ed4638..fcabc5f06c548 100644 --- a/common/utils/src/main/resources/error/error-states.json +++ b/common/utils/src/main/resources/error/error-states.json @@ -3339,7 +3339,7 @@ "description": "A duplicate schema name in a special register was detected.", "origin": "DB2", "standard": "N", - "usedBy": ["DB2"] + "usedBy": ["DB2", "Spark"] }, "42734": { "description": "A duplicate parameter-name, SQL variable name, label, or condition-name was detected.", diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 3d1fbc2fd6688..2777b0d89515d 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -448,6 +448,7 @@ setResetStatement | SET TIME ZONE interval #setTimeZone | SET TIME ZONE timezone #setTimeZone | SET TIME ZONE .*? #setTimeZone + | SET PATH EQ pathElement (COMMA pathElement)* #setPath | SET variable assignmentList #setVariable | SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ LEFT_PAREN query RIGHT_PAREN #setVariable @@ -459,6 +460,15 @@ setResetStatement | RESET .*? #resetConfiguration ; +pathElement + : DEFAULT_PATH + | SYSTEM_PATH + | PATH + | CURRENT_DATABASE + | CURRENT_SCHEMA + | multipartIdentifier + ; + executeImmediate : EXECUTE IMMEDIATE queryParam=expression (INTO targetVariable=multipartIdentifierList)? executeImmediateUsing? ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 8434816a9ca89..367474311fd1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -335,7 +335,7 @@ abstract class Optimizer(catalogManager: CatalogManager) InsertMapSortInGroupingExpressions, InsertMapSortInRepartitionExpressions, ComputeCurrentTime, - ReplaceCurrentLike(catalogManager), + ReplaceCurrentLike(catalogManager, conf), SpecialDatetimeValues, RewriteAsOfJoin, EvalInlineTables, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 0fd7d1f02fa37..24f7b1598a7f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -152,17 +152,16 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { * Replaces the expression of CurrentDatabase, CurrentCatalog, CurrentPath, and CurrentUser * with the current values. */ -case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[LogicalPlan] { +case class ReplaceCurrentLike( + catalogManager: CatalogManager, + sqlConf: SQLConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ lazy val currentNamespace = catalogManager.currentNamespace.quoted lazy val currentNamespaceSeq = catalogManager.currentNamespace.toSeq lazy val currentCatalog = catalogManager.currentCatalog.name() lazy val currentUser = CurrentUserContext.getCurrentUser - lazy val currentPathStr = { - val catalogPath = (currentCatalog +: currentNamespaceSeq).toSeq - SQLConf.get.resolutionSearchPath(catalogPath).map(_.mkString(".")).mkString(",") - } + lazy val currentPathStr = sqlConf.currentPathString(currentCatalog, currentNamespaceSeq) plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { case CurrentDatabase() => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 8464a96ef26c6..5e3261ce35959 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -953,10 +953,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat * Raises TABLE_OR_VIEW_NOT_FOUND with a formatted list of resolution path entries. * * @param quotedResolutionPathEntries each string is already a display-ready path entry (typically - * `toSQLId` of each segment from `SQLConf.resolutionSearchPath`). They are joined with - * ", " inside square brackets. This differs from [[noSuchTableError]](nameParts: - * Seq[String]), which builds one dotted bracketed path from `nameParts.dropRight(1)` via - * [[org.apache.spark.sql.catalyst.analysis.NoSuchTableException]]. + * `toSQLId` of each path entry from `SQLConf.sqlResolutionPathEntries`). They are joined + * with ", " inside square brackets, same as + * [[org.apache.spark.sql.catalyst.analysis.NoSuchItemExceptionHelper.formatSearchPath]]. */ def tableOrViewNotFoundWithSearchPath( name: Seq[String], @@ -2458,6 +2457,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key)) } + def invalidSqlPathSchemaReferenceError(qualifiedName: String): Throwable = { + new AnalysisException( + errorClass = "INVALID_SQL_PATH_SCHEMA_REFERENCE", + messageParameters = Map("qualifiedName" -> qualifiedName)) + } + def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = { DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d7612d5d78508..ae5dfe7835b7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -45,7 +45,10 @@ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.connector.catalog.CatalogManager.{ + SESSION_CATALOG_NAME, + SYSTEM_CATALOG_NAME +} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType} import org.apache.spark.storage.{StorageLevel, StorageLevelMapper} @@ -140,6 +143,71 @@ object SQLConf { } } + /** + * Second segment of the virtual path entry `system.current_schema` in `spark.sql.session.path`. + * CURRENT_SCHEMA and CURRENT_DATABASE in SET PATH both normalize to this sentinel (SQL aliases). + * It is stored literally and expanded to the session catalog + namespace when building routine + * resolution candidates and CURRENT_PATH(). + */ + private[sql] val SESSION_PATH_VIRTUAL_CURRENT_SCHEMA: String = "current_schema" + + /** True if this path entry is the virtual current-schema slot (`system.current_schema`). */ + private[sql] def isVirtualCurrentSchemaPathEntry(entry: Seq[String]): Boolean = + entry.length == 2 && + entry.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && + entry(1).equalsIgnoreCase(SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) + + /** Materialize virtual current-schema entry for duplicate checks at SET PATH time. */ + private[sql] def concreteSessionPathEntry( + entry: Seq[String], + currentCatalog: String, + currentNamespace: Seq[String]): Seq[String] = { + if (isVirtualCurrentSchemaPathEntry(entry)) { + if (currentNamespace.isEmpty) Seq(currentCatalog) + else currentCatalog +: currentNamespace + } else { + entry + } + } + + /** Expand markers in stored session path using the current catalog and namespace. */ + private[sql] def expandSessionPathMarkers( + entries: Seq[Seq[String]], + currentCatalog: String, + currentNamespace: Seq[String]): Seq[Seq[String]] = + entries.map(concreteSessionPathEntry(_, currentCatalog, currentNamespace)) + + /** + * Separator between path entries in serialized session path + * (catalog.namespace,catalog.namespace). + */ + private[sql] val SESSION_PATH_ENTRY_SEPARATOR = "," + + /** + * Parses a session path string into a list of path entries. + * Format: "catalog1.namespace1,catalog2.namespace2" + * (comma-separated, each entry catalog.namespace). + */ + private[sql] def parseSessionPath(pathStr: String): Seq[Seq[String]] = { + if (pathStr == null || pathStr.trim.isEmpty) return Seq.empty + pathStr.split(SESSION_PATH_ENTRY_SEPARATOR).map { entry => + val trimmed = entry.trim + val dot = trimmed.indexOf('.') + if (dot <= 0 || dot == trimmed.length - 1) { + Seq(trimmed) + } else { + Seq(trimmed.substring(0, dot).trim, trimmed.substring(dot + 1).trim) + } + }.toSeq.filter(_.nonEmpty) + } + + /** + * Formats path entries to session path string. + * Each entry is catalog.namespace; entries separated by comma. + */ + private[sql] def formatSessionPath(pathEntries: Seq[Seq[String]]): String = + pathEntries.map(_.mkString(".")).mkString(SESSION_PATH_ENTRY_SEPARATOR) + /** * Default config. Only used when there is no active SparkSession for the thread. * See [[get]] for more information. @@ -2439,6 +2507,26 @@ object SQLConf { .booleanConf .createWithDefault(false) + val PATH_ENABLED = + buildConf("spark.sql.path.enabled") + .version("4.2.0") + .doc("When true, enables the SQL Standard PATH feature: SET PATH, path-based routine " + + "resolution, and CURRENT_PATH(). When false, SET PATH has no effect and resolution uses " + + "the default path only.") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + + val SESSION_PATH = + buildConf("spark.sql.session.path") + .internal() + .version("4.2.0") + .doc("Session search path for routine resolution (catalog-qualified schema list). " + + "Only settable via SET PATH statement; direct SET of this config is ignored.") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .stringConf + .createOptional + // Whether to retain group by columns or not in GroupedData.agg. val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns") .version("1.4.0") @@ -8352,6 +8440,33 @@ class SQLConf extends Serializable with Logging with SqlApiConf { */ def prioritizeSystemCatalog: Boolean = !getConf(SQLConf.PERSISTENT_CATALOG_FIRST) + def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED) + + def sessionPath: Option[String] = getConf(SQLConf.SESSION_PATH) + + /** + * Returns the session path as path entries when PATH is enabled and set; None otherwise. + */ + def effectivePathEntries: Option[Seq[Seq[String]]] = + if (pathEnabled) sessionPath.map(SQLConf.parseSessionPath).filter(_.nonEmpty) + else None + + /** + * String form of the current resolution path for CURRENT_PATH(). + * When PATH is enabled and a session path is stored, formats the effective path entries + * with markers expanded. Otherwise falls back to the legacy resolutionSearchPath. + */ + def currentPathString(currentCatalog: String, currentNamespace: Seq[String]): String = { + val entries = effectivePathEntries match { + case Some(stored) => + SQLConf.expandSessionPathMarkers(stored, currentCatalog, currentNamespace) + case None => + val catalogPath = (currentCatalog +: currentNamespace).toSeq + resolutionSearchPath(catalogPath) + } + SQLConf.formatSessionPath(entries) + } + /** * Returns the resolution search path for error messages and resolution order. * This is the single source of truth for the search path used for functions, tables, and views. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index be63063408346..0b2feb0b488e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -357,6 +357,19 @@ class SparkSqlAstBuilder extends AstBuilder { * SET TIME ZONE INTERVAL 10 HOURS; * }}} */ + override def visitSetPath(ctx: SetPathContext): LogicalPlan = withOrigin(ctx) { + val elements = (0 until ctx.pathElement().size()).map { i => + val pe = ctx.pathElement(i) + if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath + else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath + else if (pe.PATH() != null) PathElement.PathRef + else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase + else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema + else PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier())) + } + SetPathCommand(elements.toSeq) + } + override def visitSetTimeZone(ctx: SetTimeZoneContext): LogicalPlan = withOrigin(ctx) { val key = SQLConf.SESSION_LOCAL_TIMEZONE.key if (ctx.interval != null) { @@ -1429,7 +1442,9 @@ class SparkSqlAstBuilder extends AstBuilder { val tableName = ctx.identifierReference.getText.split("\\.").lastOption.getOrElse("table") throw QueryCompilationErrors.describeJsonNotExtendedError(tableName) } - val relation = createUnresolvedTableOrView(ctx.identifierReference, "DESCRIBE TABLE") + val relation = createUnresolvedTableOrView( + ctx.identifierReference, + "DESCRIBE TABLE") if (ctx.describeColName != null) { if (ctx.partitionSpec != null) { throw QueryParsingErrors.descColumnForPartitionUnsupportedError(ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index e248f0eea96de..54328d16a0baf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -119,6 +119,11 @@ case class SetCommand(kv: Option[(String, Option[String])]) messageParameters = Map("variableName" -> toSQLId(varName))) } } + if (key == SQLConf.SESSION_PATH.key) { + throw new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.SET_PATH_VIA_SET", + messageParameters = Map.empty) + } if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") && key.startsWith("hive.")) { logWarning(log"'SET ${MDC(KEY, key)}=${MDC(VALUE, value)}' might not work, since Spark " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala new file mode 100644 index 0000000000000..fd2c2368b69cb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.util.Locale + +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Path element for SET PATH: either a well-known shortcut or a schema (optionally qualified). + * For SchemaInPath(parts), qualification with current catalog or SYSTEM is done at run time. + */ +sealed trait PathElement + +object PathElement { + case object DefaultPath extends PathElement + case object SystemPath extends PathElement + case object PathRef extends PathElement + /** + * Current database/schema (SQL aliases). Stored as system.current_schema; expands when + * building resolution candidates so later USE SCHEMA is reflected. + */ + case object CurrentDatabase extends PathElement + case object CurrentSchema extends PathElement + /** Schema name parts (1 = unqualified namespace, 2+ = catalog.namespace...). Qualified at run. */ + case class SchemaInPath(parts: Seq[String]) extends PathElement +} + +/** + * Command for SET PATH = pathElement (, pathElement)* + * Expands shortcuts at run time, validates no duplicates, and sets the internal session path. + */ +case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableCommand { + + override def output: Seq[Attribute] = Seq.empty + + override def run(sparkSession: SparkSession): Seq[Row] = { + if (!sparkSession.sessionState.conf.pathEnabled) { + throw new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED", + messageParameters = Map("config" -> SQLConf.PATH_ENABLED.key)) + } + val conf = sparkSession.sessionState.conf + val catalogManager = sparkSession.sessionState.catalogManager + val currentCatalog = catalogManager.currentCatalog.name + val currentNamespace = catalogManager.currentNamespace.toSeq + val caseSensitive = conf.caseSensitiveAnalysis + + val expanded = expandPathElements(elements, conf, currentCatalog, currentNamespace) + val seen = new scala.collection.mutable.HashSet[(String, String)] + val deduped = expanded.flatMap { entry => + val concrete = + SQLConf.concreteSessionPathEntry(entry, currentCatalog, currentNamespace) + def normalize(s: String): String = if (caseSensitive) s else s.toLowerCase(Locale.ROOT) + val key = (normalize(concrete.head), + normalize(concrete.lift(1).getOrElse(""))) + if (seen.contains(key)) { + throw new AnalysisException( + errorClass = "DUPLICATE_SQL_PATH_ENTRY", + messageParameters = Map("pathEntry" -> concrete.mkString("."))) + } + seen.add(key) + Some(entry) + } + + if (deduped.isEmpty) { + conf.unsetConf(SQLConf.SESSION_PATH) + } else { + conf.setConfString(SQLConf.SESSION_PATH.key, SQLConf.formatSessionPath(deduped)) + } + Seq.empty + } + + private def expandPathElements( + elements: Seq[PathElement], + conf: SQLConf, + currentCatalog: String, + currentNamespace: Seq[String]): Seq[Seq[String]] = { + val systemCatalog = CatalogManager.SYSTEM_CATALOG_NAME + val builtin = CatalogManager.BUILTIN_NAMESPACE + val session = CatalogManager.SESSION_NAMESPACE + + elements.flatMap { + case PathElement.DefaultPath => + // Default path = session order (first/second/last). Clear path; use at resolution time. + Seq.empty + case PathElement.SystemPath => + Seq(Seq(systemCatalog, builtin), Seq(systemCatalog, session)) + case PathElement.CurrentDatabase | PathElement.CurrentSchema => + Seq(Seq(systemCatalog, SQLConf.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) + case PathElement.PathRef => + conf.sessionPath match { + case Some(s) => SQLConf.parseSessionPath(s) + case None => Seq.empty + } + case PathElement.SchemaInPath(parts) => + qualifySchemaParts(parts, systemCatalog, currentCatalog) + } + } + + /** Qualify schema parts at SET time: well-known -> SYSTEM; else current catalog + namespace. */ + private def qualifySchemaParts( + parts: Seq[String], + systemCatalog: String, + currentCatalog: String): Seq[Seq[String]] = { + val wellKnown = Set( + CatalogManager.BUILTIN_NAMESPACE.toLowerCase(Locale.ROOT), + CatalogManager.SESSION_NAMESPACE.toLowerCase(Locale.ROOT)) + if (parts.isEmpty) return Seq.empty + if (parts.length > 2) { + throw QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString(".")) + } + if (parts.size == 1) { + val ns = parts.head + if (wellKnown.contains(ns.toLowerCase(Locale.ROOT))) { + Seq(Seq(systemCatalog, ns)) + } else { + Seq(Seq(currentCatalog, ns)) + } + } else { + Seq(Seq(parts.head, parts(1))) + } + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index b048e656334e9..41e636830ed5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -1998,6 +1998,25 @@ class ParametersSuite extends QueryTest with SharedSparkSession { ) } + test("SET PATH with named parameter in IDENTIFIER (PATH feature enabled)") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + // current_path() resolves via system.builtin; include it when PATH is not DEFAULT_PATH. + spark.sql("SET PATH = spark_catalog.IDENTIFIER(:ns), system.builtin", Map("ns" -> "default")) + val pathStr = spark.sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog.default"), + s"SET PATH + IDENTIFIER(:ns); got: $pathStr") + } + } + + test("SET PATH with positional parameter in IDENTIFIER (PATH feature enabled)") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + spark.sql("SET PATH = spark_catalog.IDENTIFIER(?), system.builtin", Array("default")) + val pathStr = spark.sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog.default"), + s"SET PATH + IDENTIFIER(?); got: $pathStr") + } + } + test("IDENTIFIER clause with parameter marker - table reference") { // Test IDENTIFIER clause with parameter marker in table references withTable("test_table") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala new file mode 100644 index 0000000000000..b6b2ecdb80b76 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests for SET PATH command and session path management. + * Covers the feature flag, SET PATH syntax, CURRENT_PATH() reflecting stored path, + * DEFAULT_PATH, SYSTEM_PATH, CURRENT_SCHEMA/CURRENT_DATABASE expansion, + * PATH (append), duplicate detection, and error conditions. + * + * Resolution-level tests (tables/functions resolving via the stored path) + * belong in a separate suite once the resolution engine is wired. + */ +class SetPathSuite extends QueryTest with SharedSparkSession { + + private def withPathEnabled(f: => Unit): Unit = { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true")(f) + } + + test("PATH disabled: CURRENT_PATH() returns default path") { + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), + s"Expected default path to contain spark_catalog.default, got: $pathStr") + } + + test("PATH disabled: SET PATH is rejected") { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.other") + }, + condition = "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED", + sqlState = "0A000", + parameters = Map("config" -> SQLConf.PATH_ENABLED.key)) + } + + test("PATH enabled: SET PATH and CURRENT_PATH()") { + withPathEnabled { + sql("SET PATH = spark_catalog.default, system.builtin") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog.default"), + s"Expected path to contain spark_catalog.default, got: $pathStr") + } + } + + test("PATH enabled: SET PATH = DEFAULT_PATH restores default") { + withPathEnabled { + sql("SET PATH = spark_catalog.default") + sql("SET PATH = DEFAULT_PATH") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), + s"After SET PATH = DEFAULT_PATH expected default path, got: $pathStr") + } + } + + test("PATH enabled: CURRENT_PATH() with DEFAULT_PATH contains current schema") { + withPathEnabled { + sql("SET PATH = DEFAULT_PATH") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("default"), + s"With DEFAULT_PATH, path should contain default schema, got: $pathStr") + } + } + + test("direct SET of session path config is rejected") { + val err = intercept[AnalysisException] { + sql("SET spark.sql.session.path = 'spark_catalog.default'") + } + assert(err.getMessage.contains("SET PATH"), + s"Expected SET_PATH_VIA_SET error, got: ${err.getMessage}") + } + + test("PATH enabled: duplicate path entry raises error") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.default, spark_catalog.default") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: SET PATH = PATH, schema appends to path") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_append_test") + try { + sql("SET PATH = spark_catalog.default, system.builtin") + sql("SET PATH = PATH, spark_catalog.path_append_test") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("path_append_test"), + s"PATH, schema should append; got: $pathStr") + } finally { + sql("DROP SCHEMA IF EXISTS path_append_test") + } + } + } + + test("PATH enabled: SET PATH = CURRENT_SCHEMA / CURRENT_DATABASE") { + withPathEnabled { + sql("USE spark_catalog.default") + sql("SET PATH = current_schema, system.builtin") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), + s"current_schema should expand to current schema, got: $pathStr") + sql("SET PATH = current_database, system.builtin") + val pathStr2 = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr2.contains("spark_catalog") && pathStr2.contains("default"), + s"current_database should expand to current schema, got: $pathStr2") + } + } + + test("PATH enabled: virtual CURRENT_SCHEMA expands to USE schema") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_virt_schema") + try { + sql("USE spark_catalog.path_virt_schema") + sql("SET PATH = current_schema, system.builtin") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("path_virt_schema"), + s"CURRENT_SCHEMA in PATH should reflect USE; got: $pathStr") + } finally { + sql("USE spark_catalog.default") + sql("DROP SCHEMA IF EXISTS path_virt_schema") + } + } + } + + test("PATH enabled: duplicate after expanding CURRENT_SCHEMA") { + withPathEnabled { + sql("USE spark_catalog.default") + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.default, current_schema") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: duplicate when CURRENT_SCHEMA repeated") { + withPathEnabled { + sql("USE spark_catalog.default") + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = current_schema, current_schema") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: duplicate when SYSTEM_PATH listed twice") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = SYSTEM_PATH, SYSTEM_PATH") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "system.builtin")) + } + } + + test("PATH enabled: SET PATH = SYSTEM_PATH includes system.builtin and system.session") { + withPathEnabled { + sql("SET PATH = SYSTEM_PATH") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("system.builtin") && pathStr.contains("system.session"), + s"SYSTEM_PATH should expand to builtin and session; got: $pathStr") + } + } + + test("PATH enabled: SET PATH = PATH, schema after DEFAULT_PATH (empty session path)") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_from_empty") + try { + sql("SET PATH = DEFAULT_PATH, system.builtin") + sql("SET PATH = PATH, spark_catalog.path_from_empty") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("path_from_empty"), + s"PATH after cleared path should append schema; got: $pathStr") + } finally { + sql("DROP SCHEMA IF EXISTS path_from_empty") + } + } + } + + test("PATH enabled: three-part schema reference is rejected") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = a.b.c") + }, + condition = "INVALID_SQL_PATH_SCHEMA_REFERENCE", + parameters = Map("qualifiedName" -> "a.b.c")) + } + } + + test("PATH enabled: stored path preserves typed case, resolution is case-insensitive") { + withPathEnabled { + sql("SET PATH = Spark_Catalog.Default, System.Builtin") + val pathStr = sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("Spark_Catalog.Default"), + s"Stored path should preserve case; got: $pathStr") + } + } +} From 1aae6e74b2c573466a8fe2556f5cf1e6fa5188e7 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Fri, 17 Apr 2026 08:19:26 -0700 Subject: [PATCH 03/12] [SPARK-56501][SQL] Address review: multi-level namespaces, serialization, DEFAULT_PATH MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address cloud-fan's review on PR #55364: - Disallow 1-part schema refs (SET-time capture trap); accept multi-level namespaces (≥2 parts) as-is — qualifySchemaParts collapses to arity check - Fix lossy serialization: use \u001E/\u001F control separators so multi-level namespaces and backtick-quoted identifiers with ./, round-trip correctly - Restrict DEFAULT_PATH to sole-element form (reject when combined) - SYSTEM_PATH now respects spark.sql.function.resolutionOrder - Rename deduped → foreach validation; duplicate key handles Seq[String] - Use idiomatic ctx.pathElement().asScala.map; revert unrelated formatting - Fix QueryCompilationErrors doc (sqlResolutionPathEntries → resolutionSearchPath) - Fix SQLConf doc ("session catalog" → "current catalog", "to" → "into") - Strengthen tests: exact path assertions, cross-alias dup, multi-level ns, DEFAULT_PATH combined rejection, backtick round-trip, 1-part rejection --- .../resources/error/error-conditions.json | 7 +- .../sql/errors/QueryCompilationErrors.scala | 7 +- .../apache/spark/sql/internal/SQLConf.scala | 36 +++-- .../spark/sql/execution/SparkSqlParser.scala | 11 +- .../execution/command/SetPathCommand.scala | 62 ++++----- .../org/apache/spark/sql/SetPathSuite.scala | 125 ++++++++++++------ 6 files changed, 141 insertions(+), 107 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 4b0cd788d0dbf..e595fe82ba44f 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4483,7 +4483,7 @@ }, "INVALID_SQL_PATH_SCHEMA_REFERENCE" : { "message" : [ - "Invalid qualified name in SET PATH: . Use at most two name parts (catalog.schema)." + "Invalid schema reference in SET PATH: . Use at least two name parts (catalog.schema); multi-level namespaces are allowed." ], "sqlState" : "42601" }, @@ -7694,6 +7694,11 @@ "DESC TABLE COLUMN AS JSON not supported for individual columns." ] }, + "DEFAULT_PATH_COMBINED" : { + "message" : [ + "DEFAULT_PATH must be the sole element in SET PATH. It cannot be combined with other path elements." + ] + }, "DESC_TABLE_COLUMN_PARTITION" : { "message" : [ "DESC TABLE COLUMN for a specific partition." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 5e3261ce35959..94c76976f6cd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -953,9 +953,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat * Raises TABLE_OR_VIEW_NOT_FOUND with a formatted list of resolution path entries. * * @param quotedResolutionPathEntries each string is already a display-ready path entry (typically - * `toSQLId` of each path entry from `SQLConf.sqlResolutionPathEntries`). They are joined - * with ", " inside square brackets, same as - * [[org.apache.spark.sql.catalyst.analysis.NoSuchItemExceptionHelper.formatSearchPath]]. + * `toSQLId` of each segment from `SQLConf.resolutionSearchPath`). They are joined with + * ", " inside square brackets. This differs from + * [[org.apache.spark.sql.catalyst.analysis.NoSuchItemExceptionHelper.formatSearchPath]], + * which builds one dotted bracketed path from a single search path. */ def tableOrViewNotFoundWithSearchPath( name: Seq[String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 749449a56994f..2c55445e88bf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -146,7 +146,7 @@ object SQLConf { /** * Second segment of the virtual path entry `system.current_schema` in `spark.sql.session.path`. * CURRENT_SCHEMA and CURRENT_DATABASE in SET PATH both normalize to this sentinel (SQL aliases). - * It is stored literally and expanded to the session catalog + namespace when building routine + * It is stored literally and expanded to the current catalog + namespace when building routine * resolution candidates and CURRENT_PATH(). */ private[sql] val SESSION_PATH_VIRTUAL_CURRENT_SCHEMA: String = "current_schema" @@ -177,36 +177,32 @@ object SQLConf { currentNamespace: Seq[String]): Seq[Seq[String]] = entries.map(concreteSessionPathEntry(_, currentCatalog, currentNamespace)) - /** - * Separator between path entries in serialized session path - * (catalog.namespace,catalog.namespace). - */ - private[sql] val SESSION_PATH_ENTRY_SEPARATOR = "," + /** Separator between path entries (Record Separator). */ + private[sql] val SESSION_PATH_ENTRY_SEPARATOR = "\u001E" + + /** Separator between identifier parts within a single entry (Unit Separator). */ + private[sql] val SESSION_PATH_PART_SEPARATOR = "\u001F" /** * Parses a session path string into a list of path entries. - * Format: "catalog1.namespace1,catalog2.namespace2" - * (comma-separated, each entry catalog.namespace). + * Uses \u001E between entries and \u001F between parts within each entry, + * so identifiers containing `.` or `,` round-trip correctly and multi-level + * namespaces are preserved. */ private[sql] def parseSessionPath(pathStr: String): Seq[Seq[String]] = { if (pathStr == null || pathStr.trim.isEmpty) return Seq.empty - pathStr.split(SESSION_PATH_ENTRY_SEPARATOR).map { entry => - val trimmed = entry.trim - val dot = trimmed.indexOf('.') - if (dot <= 0 || dot == trimmed.length - 1) { - Seq(trimmed) - } else { - Seq(trimmed.substring(0, dot).trim, trimmed.substring(dot + 1).trim) - } - }.toSeq.filter(_.nonEmpty) + pathStr.split(SESSION_PATH_ENTRY_SEPARATOR, -1).map { entry => + entry.split(SESSION_PATH_PART_SEPARATOR, -1).toSeq + }.toSeq.filter(_.exists(_.nonEmpty)) } /** - * Formats path entries to session path string. - * Each entry is catalog.namespace; entries separated by comma. + * Formats path entries into a session path string. + * Uses \u001E between entries and \u001F between parts within each entry. */ private[sql] def formatSessionPath(pathEntries: Seq[Seq[String]]): String = - pathEntries.map(_.mkString(".")).mkString(SESSION_PATH_ENTRY_SEPARATOR) + pathEntries.map(_.mkString(SESSION_PATH_PART_SEPARATOR)) + .mkString(SESSION_PATH_ENTRY_SEPARATOR) /** * Default config. Only used when there is no active SparkSession for the thread. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 0b2feb0b488e9..68e055fe2500f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -358,16 +358,15 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitSetPath(ctx: SetPathContext): LogicalPlan = withOrigin(ctx) { - val elements = (0 until ctx.pathElement().size()).map { i => - val pe = ctx.pathElement(i) + val elements = ctx.pathElement().asScala.map { pe => if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath else if (pe.PATH() != null) PathElement.PathRef else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema else PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier())) - } - SetPathCommand(elements.toSeq) + }.toSeq + SetPathCommand(elements) } override def visitSetTimeZone(ctx: SetTimeZoneContext): LogicalPlan = withOrigin(ctx) { @@ -1442,9 +1441,7 @@ class SparkSqlAstBuilder extends AstBuilder { val tableName = ctx.identifierReference.getText.split("\\.").lastOption.getOrElse("table") throw QueryCompilationErrors.describeJsonNotExtendedError(tableName) } - val relation = createUnresolvedTableOrView( - ctx.identifierReference, - "DESCRIBE TABLE") + val relation = createUnresolvedTableOrView(ctx.identifierReference, "DESCRIBE TABLE") if (ctx.describeColName != null) { if (ctx.partitionSpec != null) { throw QueryParsingErrors.descColumnForPartitionUnsupportedError(ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index fd2c2368b69cb..254cb8a7ecd36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf /** - * Path element for SET PATH: either a well-known shortcut or a schema (optionally qualified). - * For SchemaInPath(parts), qualification with current catalog or SYSTEM is done at run time. + * Path element for SET PATH: either a well-known shortcut or a fully qualified schema reference. + * SchemaInPath requires at least 2 parts (catalog.namespace); multi-level namespaces are allowed. */ sealed trait PathElement @@ -41,7 +41,7 @@ object PathElement { */ case object CurrentDatabase extends PathElement case object CurrentSchema extends PathElement - /** Schema name parts (1 = unqualified namespace, 2+ = catalog.namespace...). Qualified at run. */ + /** Fully qualified schema reference (catalog.namespace...). Must have at least 2 parts. */ case class SchemaInPath(parts: Seq[String]) extends PathElement } @@ -66,26 +66,23 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman val caseSensitive = conf.caseSensitiveAnalysis val expanded = expandPathElements(elements, conf, currentCatalog, currentNamespace) - val seen = new scala.collection.mutable.HashSet[(String, String)] - val deduped = expanded.flatMap { entry => + val seen = new scala.collection.mutable.HashSet[Seq[String]] + expanded.foreach { entry => val concrete = SQLConf.concreteSessionPathEntry(entry, currentCatalog, currentNamespace) def normalize(s: String): String = if (caseSensitive) s else s.toLowerCase(Locale.ROOT) - val key = (normalize(concrete.head), - normalize(concrete.lift(1).getOrElse(""))) - if (seen.contains(key)) { + val key = concrete.map(normalize) + if (!seen.add(key)) { throw new AnalysisException( errorClass = "DUPLICATE_SQL_PATH_ENTRY", messageParameters = Map("pathEntry" -> concrete.mkString("."))) } - seen.add(key) - Some(entry) } - if (deduped.isEmpty) { + if (expanded.isEmpty) { conf.unsetConf(SQLConf.SESSION_PATH) } else { - conf.setConfString(SQLConf.SESSION_PATH.key, SQLConf.formatSessionPath(deduped)) + conf.setConfString(SQLConf.SESSION_PATH.key, SQLConf.formatSessionPath(expanded)) } Seq.empty } @@ -96,15 +93,23 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman currentCatalog: String, currentNamespace: Seq[String]): Seq[Seq[String]] = { val systemCatalog = CatalogManager.SYSTEM_CATALOG_NAME - val builtin = CatalogManager.BUILTIN_NAMESPACE - val session = CatalogManager.SESSION_NAMESPACE + val builtin = Seq(systemCatalog, CatalogManager.BUILTIN_NAMESPACE) + val session = Seq(systemCatalog, CatalogManager.SESSION_NAMESPACE) elements.flatMap { case PathElement.DefaultPath => - // Default path = session order (first/second/last). Clear path; use at resolution time. + if (elements.size > 1) { + throw new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.DEFAULT_PATH_COMBINED", + messageParameters = Map.empty) + } Seq.empty case PathElement.SystemPath => - Seq(Seq(systemCatalog, builtin), Seq(systemCatalog, session)) + if (conf.sessionFunctionResolutionOrder == "first") { + Seq(session, builtin) + } else { + Seq(builtin, session) + } case PathElement.CurrentDatabase | PathElement.CurrentSchema => Seq(Seq(systemCatalog, SQLConf.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) case PathElement.PathRef => @@ -113,32 +118,15 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman case None => Seq.empty } case PathElement.SchemaInPath(parts) => - qualifySchemaParts(parts, systemCatalog, currentCatalog) + qualifySchemaParts(parts) } } - /** Qualify schema parts at SET time: well-known -> SYSTEM; else current catalog + namespace. */ - private def qualifySchemaParts( - parts: Seq[String], - systemCatalog: String, - currentCatalog: String): Seq[Seq[String]] = { - val wellKnown = Set( - CatalogManager.BUILTIN_NAMESPACE.toLowerCase(Locale.ROOT), - CatalogManager.SESSION_NAMESPACE.toLowerCase(Locale.ROOT)) - if (parts.isEmpty) return Seq.empty - if (parts.length > 2) { + private def qualifySchemaParts(parts: Seq[String]): Seq[Seq[String]] = { + if (parts.length < 2) { throw QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString(".")) } - if (parts.size == 1) { - val ns = parts.head - if (wellKnown.contains(ns.toLowerCase(Locale.ROOT))) { - Seq(Seq(systemCatalog, ns)) - } else { - Seq(Seq(currentCatalog, ns)) - } - } else { - Seq(Seq(parts.head, parts(1))) - } + Seq(parts) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index d80c1ca8c8186..34ddde5710393 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -35,10 +35,18 @@ class SetPathSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.PATH_ENABLED.key -> "true")(f) } + private def currentPath(): String = + sql("SELECT current_path()").collect().head.getString(0) + + private def pathEntries(pathStr: String): Seq[String] = + pathStr.split(",").map(_.trim).toSeq + test("PATH disabled: CURRENT_PATH() returns default path") { - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), - s"Expected default path to contain spark_catalog.default, got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries.contains("spark_catalog.default"), + s"Expected default path to contain spark_catalog.default, got: $entries") + assert(entries.exists(_.contains("builtin")), + s"Expected default path to contain builtin, got: $entries") } test("PATH disabled: SET PATH is rejected") { @@ -54,9 +62,9 @@ class SetPathSuite extends QueryTest with SharedSparkSession { test("PATH enabled: SET PATH and CURRENT_PATH()") { withPathEnabled { sql("SET PATH = spark_catalog.default, system.builtin") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), - s"Expected path to contain spark_catalog and default, got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"Expected exact path entries, got: $entries") } } @@ -64,18 +72,22 @@ class SetPathSuite extends QueryTest with SharedSparkSession { withPathEnabled { sql("SET PATH = spark_catalog.default") sql("SET PATH = DEFAULT_PATH") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), - s"After SET PATH = DEFAULT_PATH expected default path, got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries.contains("spark_catalog.default"), + s"After SET PATH = DEFAULT_PATH expected default path, got: $entries") + assert(entries.exists(_.contains("builtin")), + s"After SET PATH = DEFAULT_PATH expected builtin in path, got: $entries") } } - test("PATH enabled: CURRENT_PATH() with DEFAULT_PATH contains current schema") { + test("PATH enabled: DEFAULT_PATH combined with other elements is rejected") { withPathEnabled { - sql("SET PATH = DEFAULT_PATH") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("default"), - s"With DEFAULT_PATH, path should contain default schema, got: $pathStr") + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.foo, DEFAULT_PATH") + }, + condition = "UNSUPPORTED_FEATURE.DEFAULT_PATH_COMBINED", + sqlState = Some("0A000")) } } @@ -105,9 +117,10 @@ class SetPathSuite extends QueryTest with SharedSparkSession { try { sql("SET PATH = spark_catalog.default, system.builtin") sql("SET PATH = PATH, spark_catalog.path_append_test") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("path_append_test"), - s"PATH, schema should append; got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin", + "spark_catalog.path_append_test"), + s"PATH, schema should append; got: $entries") } finally { sql("DROP SCHEMA IF EXISTS path_append_test") } @@ -118,13 +131,26 @@ class SetPathSuite extends QueryTest with SharedSparkSession { withPathEnabled { sql("USE spark_catalog.default") sql("SET PATH = current_schema, system.builtin") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), - s"current_schema should expand to current schema, got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"current_schema should expand to current schema, got: $entries") sql("SET PATH = current_database, system.builtin") - val pathStr2 = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr2.contains("spark_catalog") && pathStr2.contains("default"), - s"current_database should expand to current schema, got: $pathStr2") + val entries2 = pathEntries(currentPath()) + assert(entries2 === Seq("spark_catalog.default", "system.builtin"), + s"current_database should expand to current schema, got: $entries2") + } + } + + test("PATH enabled: cross-alias duplicate detection (current_database, current_schema)") { + withPathEnabled { + sql("USE spark_catalog.default") + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = current_database, current_schema") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) } } @@ -134,9 +160,9 @@ class SetPathSuite extends QueryTest with SharedSparkSession { try { sql("USE spark_catalog.path_virt_schema") sql("SET PATH = current_schema, system.builtin") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("path_virt_schema"), - s"CURRENT_SCHEMA in PATH should reflect USE; got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.path_virt_schema", "system.builtin"), + s"CURRENT_SCHEMA in PATH should reflect USE; got: $entries") } finally { sql("USE spark_catalog.default") sql("DROP SCHEMA IF EXISTS path_virt_schema") @@ -185,9 +211,11 @@ class SetPathSuite extends QueryTest with SharedSparkSession { test("PATH enabled: SET PATH = SYSTEM_PATH includes system.builtin and system.session") { withPathEnabled { sql("SET PATH = SYSTEM_PATH") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("builtin") && pathStr.contains("session"), - s"SYSTEM_PATH should expand to builtin and session; got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries.contains("system.builtin"), + s"SYSTEM_PATH should include system.builtin; got: $entries") + assert(entries.contains("system.session"), + s"SYSTEM_PATH should include system.session; got: $entries") } } @@ -195,34 +223,53 @@ class SetPathSuite extends QueryTest with SharedSparkSession { withPathEnabled { sql("CREATE SCHEMA IF NOT EXISTS path_from_empty") try { - sql("SET PATH = DEFAULT_PATH, system.builtin") + sql("SET PATH = DEFAULT_PATH") sql("SET PATH = PATH, spark_catalog.path_from_empty") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("path_from_empty"), - s"PATH after cleared path should append schema; got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries.contains("spark_catalog.path_from_empty"), + s"PATH after cleared path should append schema; got: $entries") } finally { sql("DROP SCHEMA IF EXISTS path_from_empty") } } } - test("PATH enabled: three-part schema reference is rejected") { + test("PATH enabled: unqualified (1-part) schema reference is rejected") { withPathEnabled { checkError( exception = intercept[AnalysisException] { - sql("SET PATH = a.b.c") + sql("SET PATH = myschema") }, condition = "INVALID_SQL_PATH_SCHEMA_REFERENCE", - parameters = Map("qualifiedName" -> "a.b.c")) + parameters = Map( + "qualifiedName" -> "myschema")) + } + } + + test("PATH enabled: multi-level namespace (3+ parts) is accepted") { + withPathEnabled { + sql("SET PATH = iceberg_cat.db1.db2, spark_catalog.default") + val entries = pathEntries(currentPath()) + assert(entries.head === "iceberg_cat.db1.db2", + s"Multi-level namespace should be accepted; got: $entries") + } + } + + test("PATH enabled: backtick-quoted identifiers with dots round-trip correctly") { + withPathEnabled { + sql("SET PATH = `cat.a`.`sch.b`") + val entries = pathEntries(currentPath()) + assert(entries === Seq("`cat.a`.`sch.b`"), + s"Backtick-quoted identifiers should round-trip; got: $entries") } } test("PATH enabled: stored path preserves typed case, resolution is case-insensitive") { withPathEnabled { sql("SET PATH = Spark_Catalog.Default, System.Builtin") - val pathStr = sql("SELECT current_path()").collect().head.getString(0) - assert(pathStr.contains("Spark_Catalog") && pathStr.contains("Default"), - s"Stored path should preserve case; got: $pathStr") + val entries = pathEntries(currentPath()) + assert(entries === Seq("Spark_Catalog.Default", "System.Builtin"), + s"Stored path should preserve case; got: $entries") } } } From 19f4c2af82290127ab09b027d18bbd12e12f90e9 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Fri, 17 Apr 2026 08:36:16 -0700 Subject: [PATCH 04/12] [SPARK-56501][SQL] Move session PATH from SQLConf to CatalogManager Move the SQL PATH session state from a serialized string config (spark.sql.session.path) to a structured field on CatalogManager, alongside currentCatalog/currentNamespace. - Add _sessionPath: Option[Seq[Seq[String]]] to CatalogManager with synchronized getter/setter/clear/reset/copyFrom/currentPathString - Wire session path cloning in BaseSessionStateBuilder - SetPathCommand now writes CatalogManager directly - ReplaceCurrentLike reads catalogManager.currentPathString - Remove SESSION_PATH config, sessionPath, effectivePathEntries, parseSessionPath, formatSessionPath, separators from SQLConf - Remove SetCommand guard and SET_PATH_VIA_SET error condition - Add tests: programmatic SET has no effect, cloned session inherits path --- .../resources/error/error-conditions.json | 5 -- .../catalyst/optimizer/finishAnalysis.scala | 2 +- .../connector/catalog/CatalogManager.scala | 37 +++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 65 +------------------ .../sql/execution/command/SetCommand.scala | 5 -- .../execution/command/SetPathCommand.scala | 13 ++-- .../internal/BaseSessionStateBuilder.scala | 6 +- .../org/apache/spark/sql/SetPathSuite.scala | 24 +++++-- 8 files changed, 69 insertions(+), 88 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e595fe82ba44f..5832d70ac1874 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -7901,11 +7901,6 @@ "Cannot have VARIANT type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column is ." ] }, - "SET_PATH_VIA_SET" : { - "message" : [ - "The session path cannot be set using the SET statement. Use SET PATH = ... instead." - ] - }, "SET_PATH_WHEN_DISABLED" : { "message" : [ "SET PATH is disabled. Set to true to enable it." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 24f7b1598a7f2..c219abc388bd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -161,7 +161,7 @@ case class ReplaceCurrentLike( lazy val currentNamespaceSeq = catalogManager.currentNamespace.toSeq lazy val currentCatalog = catalogManager.currentCatalog.name() lazy val currentUser = CurrentUserContext.getCurrentUser - lazy val currentPathStr = sqlConf.currentPathString(currentCatalog, currentNamespaceSeq) + lazy val currentPathStr = catalogManager.currentPathString plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { case CurrentDatabase() => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 8663bb65b6a88..2f1d89775a207 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -126,6 +126,42 @@ class CatalogManager( _currentNamespace = Some(namespace) } + private var _sessionPath: Option[Seq[Seq[String]]] = None + + /** Returns the raw stored session path entries, or None if no path is set. */ + def sessionPathEntries: Option[Seq[Seq[String]]] = synchronized { _sessionPath } + + def setSessionPath(entries: Seq[Seq[String]]): Unit = synchronized { + _sessionPath = Some(entries) + } + + def clearSessionPath(): Unit = synchronized { + _sessionPath = None + } + + private[sql] def copySessionPathFrom(other: CatalogManager): Unit = synchronized { + _sessionPath = other.sessionPathEntries + } + + /** + * String form of the current resolution path for CURRENT_PATH(). + * When PATH is enabled and a session path is stored, formats the effective path entries + * with markers expanded. Otherwise falls back to the legacy resolutionSearchPath. + */ + def currentPathString: String = { + import CatalogV2Implicits._ + val entries = if (conf.pathEnabled) _sessionPath else None + entries match { + case Some(stored) => + val expanded = SQLConf.expandSessionPathMarkers( + stored, currentCatalog.name(), currentNamespace.toSeq) + expanded.map(_.quoted).mkString(",") + case None => + val catalogPath = (currentCatalog.name() +: currentNamespace).toSeq + conf.resolutionSearchPath(catalogPath).map(_.quoted).mkString(",") + } + } + private var _currentCatalogName: Option[String] = None def currentCatalog: CatalogPlugin = synchronized { @@ -154,6 +190,7 @@ class CatalogManager( catalogs.clear() _currentNamespace = None _currentCatalogName = None + _sessionPath = None v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2c55445e88bf0..fbbe76e86a1f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -144,7 +144,7 @@ object SQLConf { } /** - * Second segment of the virtual path entry `system.current_schema` in `spark.sql.session.path`. + * Second segment of the virtual path entry `system.current_schema` in the session path. * CURRENT_SCHEMA and CURRENT_DATABASE in SET PATH both normalize to this sentinel (SQL aliases). * It is stored literally and expanded to the current catalog + namespace when building routine * resolution candidates and CURRENT_PATH(). @@ -177,33 +177,6 @@ object SQLConf { currentNamespace: Seq[String]): Seq[Seq[String]] = entries.map(concreteSessionPathEntry(_, currentCatalog, currentNamespace)) - /** Separator between path entries (Record Separator). */ - private[sql] val SESSION_PATH_ENTRY_SEPARATOR = "\u001E" - - /** Separator between identifier parts within a single entry (Unit Separator). */ - private[sql] val SESSION_PATH_PART_SEPARATOR = "\u001F" - - /** - * Parses a session path string into a list of path entries. - * Uses \u001E between entries and \u001F between parts within each entry, - * so identifiers containing `.` or `,` round-trip correctly and multi-level - * namespaces are preserved. - */ - private[sql] def parseSessionPath(pathStr: String): Seq[Seq[String]] = { - if (pathStr == null || pathStr.trim.isEmpty) return Seq.empty - pathStr.split(SESSION_PATH_ENTRY_SEPARATOR, -1).map { entry => - entry.split(SESSION_PATH_PART_SEPARATOR, -1).toSeq - }.toSeq.filter(_.exists(_.nonEmpty)) - } - - /** - * Formats path entries into a session path string. - * Uses \u001E between entries and \u001F between parts within each entry. - */ - private[sql] def formatSessionPath(pathEntries: Seq[Seq[String]]): String = - pathEntries.map(_.mkString(SESSION_PATH_PART_SEPARATOR)) - .mkString(SESSION_PATH_ENTRY_SEPARATOR) - /** * Default config. Only used when there is no active SparkSession for the thread. * See [[get]] for more information. @@ -2513,16 +2486,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val SESSION_PATH = - buildConf("spark.sql.session.path") - .internal() - .version("4.2.0") - .doc("Session search path for routine resolution (catalog-qualified schema list). " + - "Only settable via SET PATH statement; direct SET of this config is ignored.") - .withBindingPolicy(ConfigBindingPolicy.SESSION) - .stringConf - .createOptional - // Whether to retain group by columns or not in GroupedData.agg. val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns") .version("1.4.0") @@ -8438,32 +8401,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED) - def sessionPath: Option[String] = getConf(SQLConf.SESSION_PATH) - - /** - * Returns the session path as path entries when PATH is enabled and set; None otherwise. - */ - def effectivePathEntries: Option[Seq[Seq[String]]] = - if (pathEnabled) sessionPath.map(SQLConf.parseSessionPath).filter(_.nonEmpty) - else None - - /** - * String form of the current resolution path for CURRENT_PATH(). - * When PATH is enabled and a session path is stored, formats the effective path entries - * with markers expanded. Otherwise falls back to the legacy resolutionSearchPath. - */ - def currentPathString(currentCatalog: String, currentNamespace: Seq[String]): String = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val entries = effectivePathEntries match { - case Some(stored) => - SQLConf.expandSessionPathMarkers(stored, currentCatalog, currentNamespace) - case None => - val catalogPath = (currentCatalog +: currentNamespace).toSeq - resolutionSearchPath(catalogPath) - } - entries.map(_.quoted).mkString(",") - } - /** * Returns the resolution search path for error messages and resolution order. * This is the single source of truth for the search path used for functions, tables, and views. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 54328d16a0baf..e248f0eea96de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -119,11 +119,6 @@ case class SetCommand(kv: Option[(String, Option[String])]) messageParameters = Map("variableName" -> toSQLId(varName))) } } - if (key == SQLConf.SESSION_PATH.key) { - throw new AnalysisException( - errorClass = "UNSUPPORTED_FEATURE.SET_PATH_VIA_SET", - messageParameters = Map.empty) - } if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") && key.startsWith("hive.")) { logWarning(log"'SET ${MDC(KEY, key)}=${MDC(VALUE, value)}' might not work, since Spark " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index 254cb8a7ecd36..e2a12e76a5f21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -65,7 +65,8 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman val currentNamespace = catalogManager.currentNamespace.toSeq val caseSensitive = conf.caseSensitiveAnalysis - val expanded = expandPathElements(elements, conf, currentCatalog, currentNamespace) + val expanded = expandPathElements(elements, conf, catalogManager, currentCatalog, + currentNamespace) val seen = new scala.collection.mutable.HashSet[Seq[String]] expanded.foreach { entry => val concrete = @@ -80,9 +81,9 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman } if (expanded.isEmpty) { - conf.unsetConf(SQLConf.SESSION_PATH) + catalogManager.clearSessionPath() } else { - conf.setConfString(SQLConf.SESSION_PATH.key, SQLConf.formatSessionPath(expanded)) + catalogManager.setSessionPath(expanded) } Seq.empty } @@ -90,6 +91,7 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman private def expandPathElements( elements: Seq[PathElement], conf: SQLConf, + catalogManager: CatalogManager, currentCatalog: String, currentNamespace: Seq[String]): Seq[Seq[String]] = { val systemCatalog = CatalogManager.SYSTEM_CATALOG_NAME @@ -113,10 +115,7 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman case PathElement.CurrentDatabase | PathElement.CurrentSchema => Seq(Seq(systemCatalog, SQLConf.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) case PathElement.PathRef => - conf.sessionPath match { - case Some(s) => SQLConf.parseSessionPath(s) - case None => Seq.empty - } + catalogManager.sessionPathEntries.getOrElse(Seq.empty) case PathElement.SchemaInPath(parts) => qualifySchemaParts(parts) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 08dd212060762..9bd68cbe72a07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -160,7 +160,11 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) - protected lazy val catalogManager = new CatalogManager(v2SessionCatalog, catalog) + protected lazy val catalogManager = { + val cm = new CatalogManager(v2SessionCatalog, catalog) + parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager)) + cm + } protected lazy val sharedRelationCache = session.sharedState.relationCache diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index 34ddde5710393..d5b4d1536e88d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -91,12 +91,26 @@ class SetPathSuite extends QueryTest with SharedSparkSession { } } - test("direct SET of session path config is rejected") { - val err = intercept[AnalysisException] { - sql("SET spark.sql.session.path = 'spark_catalog.default'") + test("programmatic SET of spark.sql.session.path has no effect on CURRENT_PATH()") { + withPathEnabled { + sql("SET PATH = spark_catalog.default, system.builtin") + spark.conf.set("spark.sql.session.path", "garbage") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"Programmatic SET should not affect path; got: $entries") + spark.conf.unset("spark.sql.session.path") + } + } + + test("PATH enabled: cloned session inherits parent path") { + withPathEnabled { + sql("SET PATH = spark_catalog.default, system.builtin") + val cloned = spark.cloneSession() + val clonedPath = cloned.sql("SELECT current_path()").collect().head.getString(0) + val entries = pathEntries(clonedPath) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"Cloned session should inherit parent path; got: $entries") } - assert(err.getMessage.contains("SET PATH"), - s"Expected SET_PATH_VIA_SET error, got: ${err.getMessage}") } test("PATH enabled: duplicate path entry raises error") { From 74cb6c6d00473934c32c8931161d104036b7f686 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Fri, 17 Apr 2026 09:00:11 -0700 Subject: [PATCH 05/12] [SPARK-56501][SQL] DEFAULT_PATH expands to concrete entries Make DEFAULT_PATH expand to the same entries as the legacy resolutionSearchPath (builtin, session, current_schema) ordered by spark.sql.function.resolutionOrder, using the virtual system.current_schema marker so USE SCHEMA is still reflected. This allows DEFAULT_PATH to compose with other path elements: SET PATH = DEFAULT_PATH, spark_catalog.extra Previously DEFAULT_PATH was restricted to sole-element use. --- .../resources/error/error-conditions.json | 5 --- .../execution/command/SetPathCommand.scala | 10 ++--- .../org/apache/spark/sql/SetPathSuite.scala | 39 +++++++++++++++---- 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 5832d70ac1874..bf976ebf6ab19 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -7694,11 +7694,6 @@ "DESC TABLE COLUMN AS JSON not supported for individual columns." ] }, - "DEFAULT_PATH_COMBINED" : { - "message" : [ - "DEFAULT_PATH must be the sole element in SET PATH. It cannot be combined with other path elements." - ] - }, "DESC_TABLE_COLUMN_PARTITION" : { "message" : [ "DESC TABLE COLUMN for a specific partition." diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index e2a12e76a5f21..b001a0bcb0c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -100,12 +100,12 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman elements.flatMap { case PathElement.DefaultPath => - if (elements.size > 1) { - throw new AnalysisException( - errorClass = "UNSUPPORTED_FEATURE.DEFAULT_PATH_COMBINED", - messageParameters = Map.empty) + val currentSchema = Seq(systemCatalog, SQLConf.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) + conf.sessionFunctionResolutionOrder match { + case "first" => Seq(session, builtin, currentSchema) + case "last" => Seq(builtin, currentSchema, session) + case _ => Seq(builtin, session, currentSchema) } - Seq.empty case PathElement.SystemPath => if (conf.sessionFunctionResolutionOrder == "first") { Seq(session, builtin) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index d5b4d1536e88d..8206b46cbb30a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -74,20 +74,45 @@ class SetPathSuite extends QueryTest with SharedSparkSession { sql("SET PATH = DEFAULT_PATH") val entries = pathEntries(currentPath()) assert(entries.contains("spark_catalog.default"), - s"After SET PATH = DEFAULT_PATH expected default path, got: $entries") - assert(entries.exists(_.contains("builtin")), - s"After SET PATH = DEFAULT_PATH expected builtin in path, got: $entries") + s"After SET PATH = DEFAULT_PATH expected current schema in path, got: $entries") + assert(entries.contains("system.builtin"), + s"After SET PATH = DEFAULT_PATH expected system.builtin, got: $entries") + assert(entries.contains("system.session"), + s"After SET PATH = DEFAULT_PATH expected system.session, got: $entries") + assert(entries.length === 3, + s"DEFAULT_PATH should expand to 3 entries, got: $entries") } } - test("PATH enabled: DEFAULT_PATH combined with other elements is rejected") { + test("PATH enabled: DEFAULT_PATH composes with other path elements") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_extra_test") + try { + sql("SET PATH = DEFAULT_PATH, spark_catalog.path_extra_test") + val entries = pathEntries(currentPath()) + assert(entries.contains("system.builtin"), + s"DEFAULT_PATH should include system.builtin; got: $entries") + assert(entries.contains("system.session"), + s"DEFAULT_PATH should include system.session; got: $entries") + assert(entries.last === "spark_catalog.path_extra_test", + s"Extra schema should be appended after DEFAULT_PATH; got: $entries") + assert(entries.length === 4, + s"DEFAULT_PATH + 1 extra should be 4 entries, got: $entries") + } finally { + sql("DROP SCHEMA IF EXISTS path_extra_test") + } + } + } + + test("PATH enabled: DEFAULT_PATH, DEFAULT_PATH raises duplicate error") { withPathEnabled { checkError( exception = intercept[AnalysisException] { - sql("SET PATH = spark_catalog.foo, DEFAULT_PATH") + sql("SET PATH = DEFAULT_PATH, DEFAULT_PATH") }, - condition = "UNSUPPORTED_FEATURE.DEFAULT_PATH_COMBINED", - sqlState = Some("0A000")) + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "system.builtin")) } } From 26a05acf17537cd2576d6b322995796212922fb5 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Fri, 17 Apr 2026 15:13:34 -0700 Subject: [PATCH 06/12] [SPARK-56501][SQL] Address review round 2: remove unused params, move path helpers to CatalogManager - Remove unused sqlConf parameter from ReplaceCurrentLike (and conf arg in Optimizer.scala); currentPathString is on CatalogManager now. - Remove unused currentNamespaceSeq in ReplaceCurrentLike. - Synchronize _sessionPath read in CatalogManager.currentPathString. - Move SESSION_PATH_VIRTUAL_CURRENT_SCHEMA, isVirtualCurrentSchemaPathEntry, concreteSessionPathEntry, expandSessionPathMarkers from SQLConf to CatalogManager companion (they are session-path helpers, not config). - Fix DUPLICATE_SQL_PATH_ENTRY message for multi-level namespaces. --- .../resources/error/error-conditions.json | 2 +- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/optimizer/finishAnalysis.scala | 6 +-- .../connector/catalog/CatalogManager.scala | 39 ++++++++++++++++++- .../apache/spark/sql/internal/SQLConf.scala | 39 +------------------ .../execution/command/SetPathCommand.scala | 6 +-- 6 files changed, 44 insertions(+), 50 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index bf976ebf6ab19..7da1a53470d08 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2063,7 +2063,7 @@ }, "DUPLICATE_SQL_PATH_ENTRY" : { "message" : [ - "Duplicate SQL path entry . The session SQL path cannot contain the same catalog.schema more than once (including after expanding shortcuts like DEFAULT_PATH or SYSTEM_PATH)." + "Duplicate SQL path entry . The session SQL path cannot contain the same namespace more than once (including after expanding shortcuts like DEFAULT_PATH or SYSTEM_PATH)." ], "sqlState" : "42732" }, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 367474311fd1e..8434816a9ca89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -335,7 +335,7 @@ abstract class Optimizer(catalogManager: CatalogManager) InsertMapSortInGroupingExpressions, InsertMapSortInRepartitionExpressions, ComputeCurrentTime, - ReplaceCurrentLike(catalogManager, conf), + ReplaceCurrentLike(catalogManager), SpecialDatetimeValues, RewriteAsOfJoin, EvalInlineTables, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index c219abc388bd2..160c4147dae61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, con import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.{instantToNanosOfDay, truncateTimeToPrecision} import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -152,13 +151,10 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { * Replaces the expression of CurrentDatabase, CurrentCatalog, CurrentPath, and CurrentUser * with the current values. */ -case class ReplaceCurrentLike( - catalogManager: CatalogManager, - sqlConf: SQLConf) extends Rule[LogicalPlan] { +case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ lazy val currentNamespace = catalogManager.currentNamespace.quoted - lazy val currentNamespaceSeq = catalogManager.currentNamespace.toSeq lazy val currentCatalog = catalogManager.currentCatalog.name() lazy val currentUser = CurrentUserContext.getCurrentUser lazy val currentPathStr = catalogManager.currentPathString diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 2f1d89775a207..7a6da2f87807d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -148,12 +148,12 @@ class CatalogManager( * When PATH is enabled and a session path is stored, formats the effective path entries * with markers expanded. Otherwise falls back to the legacy resolutionSearchPath. */ - def currentPathString: String = { + def currentPathString: String = synchronized { import CatalogV2Implicits._ val entries = if (conf.pathEnabled) _sessionPath else None entries match { case Some(stored) => - val expanded = SQLConf.expandSessionPathMarkers( + val expanded = CatalogManager.expandSessionPathMarkers( stored, currentCatalog.name(), currentNamespace.toSeq) expanded.map(_.quoted).mkString(",") case None => @@ -241,4 +241,39 @@ private[sql] object CatalogManager { (nameParts.length == 2 && nameParts.head.equalsIgnoreCase(SESSION_NAMESPACE)) || isFullyQualifiedSystemSessionViewName(nameParts) } + + /** True if a SQL path entry denotes `system.session` (case-insensitive). */ + def isSystemSessionPathEntry(parts: Seq[String]): Boolean = + parts.length == 2 && + parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && + parts(1).equalsIgnoreCase(SESSION_NAMESPACE) + + /** Virtual marker for CURRENT_SCHEMA / CURRENT_DATABASE in SET PATH. */ + val SESSION_PATH_VIRTUAL_CURRENT_SCHEMA: String = "current_schema" + + /** True if this path entry is the virtual current-schema slot (`system.current_schema`). */ + def isVirtualCurrentSchemaPathEntry(entry: Seq[String]): Boolean = + entry.length == 2 && + entry.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && + entry(1).equalsIgnoreCase(SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) + + /** Materialize virtual current-schema entry for duplicate checks at SET PATH time. */ + def concreteSessionPathEntry( + entry: Seq[String], + currentCatalog: String, + currentNamespace: Seq[String]): Seq[String] = { + if (isVirtualCurrentSchemaPathEntry(entry)) { + if (currentNamespace.isEmpty) Seq(currentCatalog) + else currentCatalog +: currentNamespace + } else { + entry + } + } + + /** Expand markers in stored session path using the current catalog and namespace. */ + def expandSessionPathMarkers( + entries: Seq[Seq[String]], + currentCatalog: String, + currentNamespace: Seq[String]): Seq[Seq[String]] = + entries.map(concreteSessionPathEntry(_, currentCatalog, currentNamespace)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fbbe76e86a1f5..d0781b632a3c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -45,10 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.connector.catalog.CatalogManager.{ - SESSION_CATALOG_NAME, - SYSTEM_CATALOG_NAME -} +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType} import org.apache.spark.storage.{StorageLevel, StorageLevelMapper} @@ -143,40 +140,6 @@ object SQLConf { } } - /** - * Second segment of the virtual path entry `system.current_schema` in the session path. - * CURRENT_SCHEMA and CURRENT_DATABASE in SET PATH both normalize to this sentinel (SQL aliases). - * It is stored literally and expanded to the current catalog + namespace when building routine - * resolution candidates and CURRENT_PATH(). - */ - private[sql] val SESSION_PATH_VIRTUAL_CURRENT_SCHEMA: String = "current_schema" - - /** True if this path entry is the virtual current-schema slot (`system.current_schema`). */ - private[sql] def isVirtualCurrentSchemaPathEntry(entry: Seq[String]): Boolean = - entry.length == 2 && - entry.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && - entry(1).equalsIgnoreCase(SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) - - /** Materialize virtual current-schema entry for duplicate checks at SET PATH time. */ - private[sql] def concreteSessionPathEntry( - entry: Seq[String], - currentCatalog: String, - currentNamespace: Seq[String]): Seq[String] = { - if (isVirtualCurrentSchemaPathEntry(entry)) { - if (currentNamespace.isEmpty) Seq(currentCatalog) - else currentCatalog +: currentNamespace - } else { - entry - } - } - - /** Expand markers in stored session path using the current catalog and namespace. */ - private[sql] def expandSessionPathMarkers( - entries: Seq[Seq[String]], - currentCatalog: String, - currentNamespace: Seq[String]): Seq[Seq[String]] = - entries.map(concreteSessionPathEntry(_, currentCatalog, currentNamespace)) - /** * Default config. Only used when there is no active SparkSession for the thread. * See [[get]] for more information. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index b001a0bcb0c35..d74ca76b41c20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -70,7 +70,7 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman val seen = new scala.collection.mutable.HashSet[Seq[String]] expanded.foreach { entry => val concrete = - SQLConf.concreteSessionPathEntry(entry, currentCatalog, currentNamespace) + CatalogManager.concreteSessionPathEntry(entry, currentCatalog, currentNamespace) def normalize(s: String): String = if (caseSensitive) s else s.toLowerCase(Locale.ROOT) val key = concrete.map(normalize) if (!seen.add(key)) { @@ -100,7 +100,7 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman elements.flatMap { case PathElement.DefaultPath => - val currentSchema = Seq(systemCatalog, SQLConf.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) + val currentSchema = Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) conf.sessionFunctionResolutionOrder match { case "first" => Seq(session, builtin, currentSchema) case "last" => Seq(builtin, currentSchema, session) @@ -113,7 +113,7 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman Seq(builtin, session) } case PathElement.CurrentDatabase | PathElement.CurrentSchema => - Seq(Seq(systemCatalog, SQLConf.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) + Seq(Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) case PathElement.PathRef => catalogManager.sessionPathEntries.getOrElse(Seq.empty) case PathElement.SchemaInPath(parts) => From dbdeb5231fa217b286296c7aac4c222756cda63e Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Fri, 17 Apr 2026 16:29:00 -0700 Subject: [PATCH 07/12] [SPARK-56501][SQL] Address review: add enabled-but-unset fallback and DEFAULT_PATH+builtin dup tests --- .../org/apache/spark/sql/SetPathSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index 8206b46cbb30a..02eedc94fdb66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -59,6 +59,28 @@ class SetPathSuite extends QueryTest with SharedSparkSession { parameters = Map("config" -> SQLConf.PATH_ENABLED.key)) } + test("PATH enabled but no SET PATH: falls back to legacy resolutionSearchPath") { + withPathEnabled { + val entries = pathEntries(currentPath()) + assert(entries.exists(_.contains("builtin")), + s"Enabled-but-unset should fall back to legacy path with builtin, got: $entries") + assert(entries.exists(_.contains("default")), + s"Enabled-but-unset should include current schema, got: $entries") + } + } + + test("PATH enabled: DEFAULT_PATH + explicit builtin raises duplicate") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = DEFAULT_PATH, system.builtin") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "system.builtin")) + } + } + test("PATH enabled: SET PATH and CURRENT_PATH()") { withPathEnabled { sql("SET PATH = spark_catalog.default, system.builtin") From 82cfeb35e6cc30ba5a18b151cb6768c31bb7fd70 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Mon, 20 Apr 2026 08:10:32 -0700 Subject: [PATCH 08/12] [SPARK-56501][SQL] Address review round 3: shared ordering helper, doc fixes, new tests - Fix visitSetPath Scaladoc placement (was stealing visitSetTimeZone doc) - Remove isSystemSessionPathEntry from this PR (no call sites; will be added in the persist-path PR that needs it) - Quote parts containing dots in DUPLICATE_SQL_PATH_ENTRY error message - Extract defaultPathOrder/systemPathOrder on SQLConf to eliminate ordering logic duplication between SetPathCommand and resolutionSearchPath - Add tests for sessionFunctionResolutionOrder = "first" and "last" - Note that cloneSession shares CatalogManager (same as USE CATALOG/SCHEMA) --- .../connector/catalog/CatalogManager.scala | 6 ----- .../apache/spark/sql/internal/SQLConf.scala | 27 ++++++++++++++----- .../spark/sql/execution/SparkSqlParser.scala | 17 +++++++++--- .../execution/command/SetPathCommand.scala | 17 +++--------- .../org/apache/spark/sql/SetPathSuite.scala | 26 ++++++++++++++++++ 5 files changed, 63 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 7a6da2f87807d..7873056669d3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -242,12 +242,6 @@ private[sql] object CatalogManager { isFullyQualifiedSystemSessionViewName(nameParts) } - /** True if a SQL path entry denotes `system.session` (case-insensitive). */ - def isSystemSessionPathEntry(parts: Seq[String]): Boolean = - parts.length == 2 && - parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && - parts(1).equalsIgnoreCase(SESSION_NAMESPACE) - /** Virtual marker for CURRENT_SCHEMA / CURRENT_DATABASE in SET PATH. */ val SESSION_PATH_VIRTUAL_CURRENT_SCHEMA: String = "current_schema" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d0781b632a3c1..baca68b82260d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -8370,23 +8370,36 @@ class SQLConf extends Serializable with Logging with SqlApiConf { * Uses [[sessionFunctionResolutionOrder]]: "first" (session first), "second" (session second), * "last" (session last). When catalogPath is empty, returns only system namespaces. */ - def resolutionSearchPath(catalogPath: Seq[String]): Seq[Seq[String]] = { + def resolutionSearchPath(catalogPath: Seq[String]): Seq[Seq[String]] = + defaultPathOrder(Seq(catalogPath)) + + /** + * Orders the given catalog path entries by [[sessionFunctionResolutionOrder]], inserting + * system.session and system.builtin. Used by both the legacy single-schema resolution and + * by SET PATH's DEFAULT_PATH / SYSTEM_PATH expansion to keep ordering in sync. + * + * @param catalogEntries persistent catalog path entries (may be empty). + */ + def defaultPathOrder(catalogEntries: Seq[Seq[String]]): Seq[Seq[String]] = { val order = sessionFunctionResolutionOrder val session = Seq("system", "session") val builtin = Seq("system", "builtin") order match { case "first" => - if (catalogPath.isEmpty) Seq(session, builtin) - else Seq(session, builtin, catalogPath) + if (catalogEntries.isEmpty) Seq(session, builtin) + else Seq(session, builtin) ++ catalogEntries case "last" => - if (catalogPath.isEmpty) Seq(builtin, session) - else Seq(builtin, catalogPath, session) + if (catalogEntries.isEmpty) Seq(builtin, session) + else Seq(builtin) ++ catalogEntries ++ Seq(session) case _ => // "second" - if (catalogPath.isEmpty) Seq(builtin, session) - else Seq(builtin, session, catalogPath) + if (catalogEntries.isEmpty) Seq(builtin, session) + else Seq(builtin, session) ++ catalogEntries } } + /** System-only path (builtin + session) ordered by [[sessionFunctionResolutionOrder]]. */ + def systemPathOrder: Seq[Seq[String]] = defaultPathOrder(Seq.empty) + override def legacyParameterSubstitutionConstantsOnly: Boolean = getConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 68e055fe2500f..abc62241a0066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -349,12 +349,12 @@ class SparkSqlAstBuilder extends AstBuilder { } /** - * Create a [[SetCommand]] logical plan to set [[SQLConf.SESSION_LOCAL_TIMEZONE]] + * Create a [[SetPathCommand]] to set the session SQL path. * Example SQL : * {{{ - * SET TIME ZONE LOCAL; - * SET TIME ZONE 'Asia/Shanghai'; - * SET TIME ZONE INTERVAL 10 HOURS; + * SET PATH = spark_catalog.default, system.builtin; + * SET PATH = DEFAULT_PATH; + * SET PATH = SYSTEM_PATH, spark_catalog.myschema; * }}} */ override def visitSetPath(ctx: SetPathContext): LogicalPlan = withOrigin(ctx) { @@ -369,6 +369,15 @@ class SparkSqlAstBuilder extends AstBuilder { SetPathCommand(elements) } + /** + * Create a [[SetCommand]] logical plan to set [[SQLConf.SESSION_LOCAL_TIMEZONE]] + * Example SQL : + * {{{ + * SET TIME ZONE LOCAL; + * SET TIME ZONE 'Asia/Shanghai'; + * SET TIME ZONE INTERVAL 10 HOURS; + * }}} + */ override def visitSetTimeZone(ctx: SetTimeZoneContext): LogicalPlan = withOrigin(ctx) { val key = SQLConf.SESSION_LOCAL_TIMEZONE.key if (ctx.interval != null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index d74ca76b41c20..a3430d01da5b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -76,7 +76,8 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman if (!seen.add(key)) { throw new AnalysisException( errorClass = "DUPLICATE_SQL_PATH_ENTRY", - messageParameters = Map("pathEntry" -> concrete.mkString("."))) + messageParameters = Map("pathEntry" -> + concrete.map(p => if (p.contains(".")) s"`$p`" else p).mkString("."))) } } @@ -95,23 +96,13 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman currentCatalog: String, currentNamespace: Seq[String]): Seq[Seq[String]] = { val systemCatalog = CatalogManager.SYSTEM_CATALOG_NAME - val builtin = Seq(systemCatalog, CatalogManager.BUILTIN_NAMESPACE) - val session = Seq(systemCatalog, CatalogManager.SESSION_NAMESPACE) elements.flatMap { case PathElement.DefaultPath => val currentSchema = Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) - conf.sessionFunctionResolutionOrder match { - case "first" => Seq(session, builtin, currentSchema) - case "last" => Seq(builtin, currentSchema, session) - case _ => Seq(builtin, session, currentSchema) - } + conf.defaultPathOrder(Seq(currentSchema)) case PathElement.SystemPath => - if (conf.sessionFunctionResolutionOrder == "first") { - Seq(session, builtin) - } else { - Seq(builtin, session) - } + conf.systemPathOrder case PathElement.CurrentDatabase | PathElement.CurrentSchema => Seq(Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) case PathElement.PathRef => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index 02eedc94fdb66..078d0802a3636 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -333,4 +333,30 @@ class SetPathSuite extends QueryTest with SharedSparkSession { s"Stored path should preserve case; got: $entries") } } + + test("PATH enabled: DEFAULT_PATH respects sessionFunctionResolutionOrder = first") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "first") { + sql("SET PATH = DEFAULT_PATH") + val entries = pathEntries(currentPath()) + assert(entries.head.contains("session"), + s"With 'first' order, session should come first; got: $entries") + } + } + + test("PATH enabled: DEFAULT_PATH respects sessionFunctionResolutionOrder = last") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "last") { + sql("SET PATH = DEFAULT_PATH") + val entries = pathEntries(currentPath()) + assert(entries.last.contains("session"), + s"With 'last' order, session should come last; got: $entries") + } + } + + // Note: cloneSession() shares the parent's CatalogManager, so SET PATH on either session + // affects both. This matches the behavior of USE CATALOG / USE SCHEMA on cloned sessions. + // Independence would require cloneSession to create a separate CatalogManager. } From d6a288cac053e74559fbfef739b363fe90c7e678 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 21 Apr 2026 08:36:22 -0400 Subject: [PATCH 09/12] Trigger CI From 6239db069aa6fa6485029d549692f91c2fa0100b Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 21 Apr 2026 13:07:02 -0400 Subject: [PATCH 10/12] [SPARK-56501][SQL] Address review round 4: PathRef fallback, resolutionSearchPath guard, doc/grammar fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PathRef on unset session now expands to DEFAULT_PATH entries (prevents clobbering defaults when composing PATH with other elements) - resolutionSearchPath(Seq.empty) now correctly returns system-only path instead of trailing empty-Seq entry - Fix doc: "has no effect" → "is rejected" for PATH_ENABLED - Fix grammar: align #setPath label column - Add test: SET PATH = PATH on unset session includes builtin defaults --- .../apache/spark/sql/catalyst/parser/SqlBaseParser.g4 | 2 +- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../spark/sql/execution/command/SetPathCommand.scala | 6 +++++- .../scala/org/apache/spark/sql/SetPathSuite.scala | 11 +++++++++++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 5820ec11b89b8..91f24b033aa22 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -448,7 +448,7 @@ setResetStatement | SET TIME ZONE interval #setTimeZone | SET TIME ZONE timezone #setTimeZone | SET TIME ZONE .*? #setTimeZone - | SET PATH EQ pathElement (COMMA pathElement)* #setPath + | SET PATH EQ pathElement (COMMA pathElement)* #setPath | SET variable assignmentList #setVariable | SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ LEFT_PAREN query RIGHT_PAREN #setVariable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index baca68b82260d..97d1d4f2e9a83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2443,7 +2443,7 @@ object SQLConf { buildConf("spark.sql.path.enabled") .version("4.2.0") .doc("When true, enables the SQL Standard PATH feature: SET PATH, path-based routine " + - "resolution, and CURRENT_PATH(). When false, SET PATH has no effect and resolution uses " + + "resolution, and CURRENT_PATH(). When false, SET PATH is rejected and resolution uses " + "the default path only.") .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf @@ -8371,7 +8371,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf { * "last" (session last). When catalogPath is empty, returns only system namespaces. */ def resolutionSearchPath(catalogPath: Seq[String]): Seq[Seq[String]] = - defaultPathOrder(Seq(catalogPath)) + defaultPathOrder(if (catalogPath.isEmpty) Seq.empty else Seq(catalogPath)) /** * Orders the given catalog path entries by [[sessionFunctionResolutionOrder]], inserting diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index a3430d01da5b0..d58dbe4b916a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -106,7 +106,11 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman case PathElement.CurrentDatabase | PathElement.CurrentSchema => Seq(Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) case PathElement.PathRef => - catalogManager.sessionPathEntries.getOrElse(Seq.empty) + catalogManager.sessionPathEntries.getOrElse { + val currentSchema = + Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) + conf.defaultPathOrder(Seq(currentSchema)) + } case PathElement.SchemaInPath(parts) => qualifySchemaParts(parts) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index 078d0802a3636..6b2de59fd5b00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -280,6 +280,17 @@ class SetPathSuite extends QueryTest with SharedSparkSession { } } + test("PATH enabled: SET PATH = PATH on unset session includes defaults") { + withPathEnabled { + sql("SET PATH = PATH, spark_catalog.extra") + val entries = pathEntries(currentPath()) + assert(entries.exists(_.contains("builtin")), + s"PATH on unset session should include builtin defaults; got: $entries") + assert(entries.contains("spark_catalog.extra"), + s"PATH on unset session should include appended schema; got: $entries") + } + } + test("PATH enabled: SET PATH = PATH, schema after DEFAULT_PATH (empty session path)") { withPathEnabled { sql("CREATE SCHEMA IF NOT EXISTS path_from_empty") From cbd3156ab49dbe9e4fafa1312734843ec95e3968 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 21 Apr 2026 13:47:41 -0400 Subject: [PATCH 11/12] [SPARK-56501][SQL] Replace string sentinel with SessionPathEntry ADT Replace the string-based `system.current_schema` sentinel with a typed sealed trait `SessionPathEntry` on CatalogManager: - CurrentSchemaEntry: marker that expands dynamically with USE SCHEMA - LiteralPathEntry(parts): a fully qualified schema reference This eliminates SESSION_PATH_VIRTUAL_CURRENT_SCHEMA, isVirtualCurrentSchemaPathEntry, concreteSessionPathEntry, and expandSessionPathMarkers. Consumers now pattern-match on the ADT. The session path type changes from Option[Seq[Seq[String]]] to Option[Seq[SessionPathEntry]]. resolvePathEntries replaces expandSessionPathMarkers for converting to concrete parts. --- .../connector/catalog/CatalogManager.scala | 66 ++++++++++--------- .../execution/command/SetPathCommand.scala | 48 +++++++------- 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 7873056669d3a..43b3999853ed7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -126,12 +126,14 @@ class CatalogManager( _currentNamespace = Some(namespace) } - private var _sessionPath: Option[Seq[Seq[String]]] = None + import CatalogManager.SessionPathEntry + + private var _sessionPath: Option[Seq[SessionPathEntry]] = None /** Returns the raw stored session path entries, or None if no path is set. */ - def sessionPathEntries: Option[Seq[Seq[String]]] = synchronized { _sessionPath } + def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { _sessionPath } - def setSessionPath(entries: Seq[Seq[String]]): Unit = synchronized { + def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized { _sessionPath = Some(entries) } @@ -150,12 +152,12 @@ class CatalogManager( */ def currentPathString: String = synchronized { import CatalogV2Implicits._ - val entries = if (conf.pathEnabled) _sessionPath else None - entries match { - case Some(stored) => - val expanded = CatalogManager.expandSessionPathMarkers( - stored, currentCatalog.name(), currentNamespace.toSeq) - expanded.map(_.quoted).mkString(",") + val stored = if (conf.pathEnabled) _sessionPath else None + stored match { + case Some(entries) => + val resolved = CatalogManager.resolvePathEntries( + entries, currentCatalog.name(), currentNamespace.toSeq) + resolved.map(_.quoted).mkString(",") case None => val catalogPath = (currentCatalog.name() +: currentNamespace).toSeq conf.resolutionSearchPath(catalogPath).map(_.quoted).mkString(",") @@ -242,32 +244,32 @@ private[sql] object CatalogManager { isFullyQualifiedSystemSessionViewName(nameParts) } - /** Virtual marker for CURRENT_SCHEMA / CURRENT_DATABASE in SET PATH. */ - val SESSION_PATH_VIRTUAL_CURRENT_SCHEMA: String = "current_schema" - - /** True if this path entry is the virtual current-schema slot (`system.current_schema`). */ - def isVirtualCurrentSchemaPathEntry(entry: Seq[String]): Boolean = - entry.length == 2 && - entry.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && - entry(1).equalsIgnoreCase(SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) - - /** Materialize virtual current-schema entry for duplicate checks at SET PATH time. */ - def concreteSessionPathEntry( - entry: Seq[String], - currentCatalog: String, - currentNamespace: Seq[String]): Seq[String] = { - if (isVirtualCurrentSchemaPathEntry(entry)) { - if (currentNamespace.isEmpty) Seq(currentCatalog) - else currentCatalog +: currentNamespace - } else { - entry + /** + * A single entry in the session SQL path: either a literal schema + * or the current-schema marker. + */ + sealed trait SessionPathEntry { + /** Resolve to concrete catalog + namespace parts. */ + def resolve( + currentCatalog: String, + currentNamespace: Seq[String]): Seq[String] = this match { + case CurrentSchemaEntry => + if (currentNamespace.isEmpty) Seq(currentCatalog) + else currentCatalog +: currentNamespace + case LiteralPathEntry(parts) => parts } } - /** Expand markers in stored session path using the current catalog and namespace. */ - def expandSessionPathMarkers( - entries: Seq[Seq[String]], + /** Marker for CURRENT_SCHEMA / CURRENT_DATABASE: expands dynamically with USE SCHEMA. */ + case object CurrentSchemaEntry extends SessionPathEntry + + /** A fully qualified schema reference (catalog.namespace...). */ + case class LiteralPathEntry(parts: Seq[String]) extends SessionPathEntry + + /** Resolve all entries in a session path to concrete catalog + namespace parts. */ + def resolvePathEntries( + entries: Seq[SessionPathEntry], currentCatalog: String, currentNamespace: Seq[String]): Seq[Seq[String]] = - entries.map(concreteSessionPathEntry(_, currentCatalog, currentNamespace)) + entries.map(_.resolve(currentCatalog, currentNamespace)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index d58dbe4b916a0..70538160eefdb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -22,6 +22,9 @@ import java.util.Locale import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogManager.{ + CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry +} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf @@ -65,12 +68,10 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman val currentNamespace = catalogManager.currentNamespace.toSeq val caseSensitive = conf.caseSensitiveAnalysis - val expanded = expandPathElements(elements, conf, catalogManager, currentCatalog, - currentNamespace) + val expanded = expandPathElements(elements, conf, catalogManager) val seen = new scala.collection.mutable.HashSet[Seq[String]] expanded.foreach { entry => - val concrete = - CatalogManager.concreteSessionPathEntry(entry, currentCatalog, currentNamespace) + val concrete = entry.resolve(currentCatalog, currentNamespace) def normalize(s: String): String = if (caseSensitive) s else s.toLowerCase(Locale.ROOT) val key = concrete.map(normalize) if (!seen.add(key)) { @@ -92,35 +93,32 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman private def expandPathElements( elements: Seq[PathElement], conf: SQLConf, - catalogManager: CatalogManager, - currentCatalog: String, - currentNamespace: Seq[String]): Seq[Seq[String]] = { - val systemCatalog = CatalogManager.SYSTEM_CATALOG_NAME + catalogManager: CatalogManager): Seq[SessionPathEntry] = { + val currentSchemaSentinel = Seq("__current_schema__") + + def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map { + case p if p == currentSchemaSentinel => CurrentSchemaEntry + case p => LiteralPathEntry(p) + } + + def defaultWithCurrentSchema: Seq[SessionPathEntry] = + toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel))) elements.flatMap { case PathElement.DefaultPath => - val currentSchema = Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) - conf.defaultPathOrder(Seq(currentSchema)) + defaultWithCurrentSchema case PathElement.SystemPath => - conf.systemPathOrder + toEntries(conf.systemPathOrder) case PathElement.CurrentDatabase | PathElement.CurrentSchema => - Seq(Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA)) + Seq(CurrentSchemaEntry) case PathElement.PathRef => - catalogManager.sessionPathEntries.getOrElse { - val currentSchema = - Seq(systemCatalog, CatalogManager.SESSION_PATH_VIRTUAL_CURRENT_SCHEMA) - conf.defaultPathOrder(Seq(currentSchema)) - } + catalogManager.sessionPathEntries.getOrElse(defaultWithCurrentSchema) case PathElement.SchemaInPath(parts) => - qualifySchemaParts(parts) - } - } - - private def qualifySchemaParts(parts: Seq[String]): Seq[Seq[String]] = { - if (parts.length < 2) { - throw QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString(".")) + if (parts.length < 2) { + throw QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString(".")) + } + Seq(LiteralPathEntry(parts)) } - Seq(parts) } } From e60d345690b7affff1ce92a80240c2c9228b5bb5 Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 21 Apr 2026 14:42:16 -0400 Subject: [PATCH 12/12] [SPARK-56501][SQL] Address review: clone TODO, test naming, case-insensitive dup test --- .../org/apache/spark/sql/SetPathSuite.scala | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index 6b2de59fd5b00..d62827dea2371 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -336,7 +336,7 @@ class SetPathSuite extends QueryTest with SharedSparkSession { } } - test("PATH enabled: stored path preserves typed case, resolution is case-insensitive") { + test("PATH enabled: stored path preserves typed case") { withPathEnabled { sql("SET PATH = Spark_Catalog.Default, System.Builtin") val entries = pathEntries(currentPath()) @@ -345,6 +345,18 @@ class SetPathSuite extends QueryTest with SharedSparkSession { } } + test("PATH enabled: case-insensitive duplicate detection") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.DEFAULT, spark_catalog.default") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + test("PATH enabled: DEFAULT_PATH respects sessionFunctionResolutionOrder = first") { withSQLConf( SQLConf.PATH_ENABLED.key -> "true", @@ -367,7 +379,9 @@ class SetPathSuite extends QueryTest with SharedSparkSession { } } - // Note: cloneSession() shares the parent's CatalogManager, so SET PATH on either session - // affects both. This matches the behavior of USE CATALOG / USE SCHEMA on cloned sessions. - // Independence would require cloneSession to create a separate CatalogManager. + // TODO: cloneSession() constructs a new CatalogManager per forked session and + // explicitly copies only the stored session path via copySessionPathFrom. + // Other CatalogManager state propagation (current catalog/namespace, registered + // catalogs) on clone is currently incidental — audit and pin down the intended + // semantics in a follow-up. }