diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 8c032d31cff61..7da1a53470d08 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2061,6 +2061,12 @@ ], "sqlState" : "42711" }, + "DUPLICATE_SQL_PATH_ENTRY" : { + "message" : [ + "Duplicate SQL path entry . The session SQL path cannot contain the same namespace more than once (including after expanding shortcuts like DEFAULT_PATH or SYSTEM_PATH)." + ], + "sqlState" : "42732" + }, "DUPLICATE_VARIABLE_NAME_INSIDE_DECLARE" : { "message" : [ "Found duplicate variable in the declare variable list. Please, remove one of them." @@ -4475,6 +4481,12 @@ ], "sqlState" : "XXKD0" }, + "INVALID_SQL_PATH_SCHEMA_REFERENCE" : { + "message" : [ + "Invalid schema reference in SET PATH: . Use at least two name parts (catalog.schema); multi-level namespaces are allowed." + ], + "sqlState" : "42601" + }, "INVALID_SQL_SYNTAX" : { "message" : [ "Invalid SQL syntax:" @@ -7884,6 +7896,11 @@ "Cannot have VARIANT type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column is ." ] }, + "SET_PATH_WHEN_DISABLED" : { + "message" : [ + "SET PATH is disabled. Set to true to enable it." + ] + }, "SET_PROPERTIES_AND_DBPROPERTIES" : { "message" : [ "set PROPERTIES and DBPROPERTIES at the same time." diff --git a/common/utils/src/main/resources/error/error-states.json b/common/utils/src/main/resources/error/error-states.json index c2b2bb2ed4638..fcabc5f06c548 100644 --- a/common/utils/src/main/resources/error/error-states.json +++ b/common/utils/src/main/resources/error/error-states.json @@ -3339,7 +3339,7 @@ "description": "A duplicate schema name in a special register was detected.", "origin": "DB2", "standard": "N", - "usedBy": ["DB2"] + "usedBy": ["DB2", "Spark"] }, "42734": { "description": "A duplicate parameter-name, SQL variable name, label, or condition-name was detected.", diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index d3fd57d396eaa..91f24b033aa22 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -448,6 +448,7 @@ setResetStatement | SET TIME ZONE interval #setTimeZone | SET TIME ZONE timezone #setTimeZone | SET TIME ZONE .*? #setTimeZone + | SET PATH EQ pathElement (COMMA pathElement)* #setPath | SET variable assignmentList #setVariable | SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ LEFT_PAREN query RIGHT_PAREN #setVariable @@ -459,6 +460,15 @@ setResetStatement | RESET .*? #resetConfiguration ; +pathElement + : DEFAULT_PATH + | SYSTEM_PATH + | PATH + | CURRENT_DATABASE + | CURRENT_SCHEMA + | multipartIdentifier + ; + executeImmediate : EXECUTE IMMEDIATE queryParam=expression (INTO targetVariable=multipartIdentifierList)? executeImmediateUsing? ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 2377e8affc463..160c4147dae61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, con import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.{instantToNanosOfDay, truncateTimeToPrecision} import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -156,13 +155,9 @@ case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[Logic def apply(plan: LogicalPlan): LogicalPlan = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ lazy val currentNamespace = catalogManager.currentNamespace.quoted - lazy val currentNamespaceSeq = catalogManager.currentNamespace.toSeq lazy val currentCatalog = catalogManager.currentCatalog.name() lazy val currentUser = CurrentUserContext.getCurrentUser - lazy val currentPathStr = { - val catalogPath = (currentCatalog +: currentNamespaceSeq).toSeq - SQLConf.get.resolutionSearchPath(catalogPath).map(_.quoted).mkString(",") - } + lazy val currentPathStr = catalogManager.currentPathString plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) { case CurrentDatabase() => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 8663bb65b6a88..43b3999853ed7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -126,6 +126,44 @@ class CatalogManager( _currentNamespace = Some(namespace) } + import CatalogManager.SessionPathEntry + + private var _sessionPath: Option[Seq[SessionPathEntry]] = None + + /** Returns the raw stored session path entries, or None if no path is set. */ + def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { _sessionPath } + + def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized { + _sessionPath = Some(entries) + } + + def clearSessionPath(): Unit = synchronized { + _sessionPath = None + } + + private[sql] def copySessionPathFrom(other: CatalogManager): Unit = synchronized { + _sessionPath = other.sessionPathEntries + } + + /** + * String form of the current resolution path for CURRENT_PATH(). + * When PATH is enabled and a session path is stored, formats the effective path entries + * with markers expanded. Otherwise falls back to the legacy resolutionSearchPath. + */ + def currentPathString: String = synchronized { + import CatalogV2Implicits._ + val stored = if (conf.pathEnabled) _sessionPath else None + stored match { + case Some(entries) => + val resolved = CatalogManager.resolvePathEntries( + entries, currentCatalog.name(), currentNamespace.toSeq) + resolved.map(_.quoted).mkString(",") + case None => + val catalogPath = (currentCatalog.name() +: currentNamespace).toSeq + conf.resolutionSearchPath(catalogPath).map(_.quoted).mkString(",") + } + } + private var _currentCatalogName: Option[String] = None def currentCatalog: CatalogPlugin = synchronized { @@ -154,6 +192,7 @@ class CatalogManager( catalogs.clear() _currentNamespace = None _currentCatalogName = None + _sessionPath = None v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } @@ -204,4 +243,33 @@ private[sql] object CatalogManager { (nameParts.length == 2 && nameParts.head.equalsIgnoreCase(SESSION_NAMESPACE)) || isFullyQualifiedSystemSessionViewName(nameParts) } + + /** + * A single entry in the session SQL path: either a literal schema + * or the current-schema marker. + */ + sealed trait SessionPathEntry { + /** Resolve to concrete catalog + namespace parts. */ + def resolve( + currentCatalog: String, + currentNamespace: Seq[String]): Seq[String] = this match { + case CurrentSchemaEntry => + if (currentNamespace.isEmpty) Seq(currentCatalog) + else currentCatalog +: currentNamespace + case LiteralPathEntry(parts) => parts + } + } + + /** Marker for CURRENT_SCHEMA / CURRENT_DATABASE: expands dynamically with USE SCHEMA. */ + case object CurrentSchemaEntry extends SessionPathEntry + + /** A fully qualified schema reference (catalog.namespace...). */ + case class LiteralPathEntry(parts: Seq[String]) extends SessionPathEntry + + /** Resolve all entries in a session path to concrete catalog + namespace parts. */ + def resolvePathEntries( + entries: Seq[SessionPathEntry], + currentCatalog: String, + currentNamespace: Seq[String]): Seq[Seq[String]] = + entries.map(_.resolve(currentCatalog, currentNamespace)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 8464a96ef26c6..94c76976f6cd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -954,9 +954,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat * * @param quotedResolutionPathEntries each string is already a display-ready path entry (typically * `toSQLId` of each segment from `SQLConf.resolutionSearchPath`). They are joined with - * ", " inside square brackets. This differs from [[noSuchTableError]](nameParts: - * Seq[String]), which builds one dotted bracketed path from `nameParts.dropRight(1)` via - * [[org.apache.spark.sql.catalyst.analysis.NoSuchTableException]]. + * ", " inside square brackets. This differs from + * [[org.apache.spark.sql.catalyst.analysis.NoSuchItemExceptionHelper.formatSearchPath]], + * which builds one dotted bracketed path from a single search path. */ def tableOrViewNotFoundWithSearchPath( name: Seq[String], @@ -2458,6 +2458,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key)) } + def invalidSqlPathSchemaReferenceError(qualifiedName: String): Throwable = { + new AnalysisException( + errorClass = "INVALID_SQL_PATH_SCHEMA_REFERENCE", + messageParameters = Map("qualifiedName" -> qualifiedName)) + } + def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = { DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d7612d5d78508..97d1d4f2e9a83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2439,6 +2439,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val PATH_ENABLED = + buildConf("spark.sql.path.enabled") + .version("4.2.0") + .doc("When true, enables the SQL Standard PATH feature: SET PATH, path-based routine " + + "resolution, and CURRENT_PATH(). When false, SET PATH is rejected and resolution uses " + + "the default path only.") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + // Whether to retain group by columns or not in GroupedData.agg. val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns") .version("1.4.0") @@ -8352,29 +8362,44 @@ class SQLConf extends Serializable with Logging with SqlApiConf { */ def prioritizeSystemCatalog: Boolean = !getConf(SQLConf.PERSISTENT_CATALOG_FIRST) + def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED) + /** * Returns the resolution search path for error messages and resolution order. * This is the single source of truth for the search path used for functions, tables, and views. * Uses [[sessionFunctionResolutionOrder]]: "first" (session first), "second" (session second), * "last" (session last). When catalogPath is empty, returns only system namespaces. */ - def resolutionSearchPath(catalogPath: Seq[String]): Seq[Seq[String]] = { + def resolutionSearchPath(catalogPath: Seq[String]): Seq[Seq[String]] = + defaultPathOrder(if (catalogPath.isEmpty) Seq.empty else Seq(catalogPath)) + + /** + * Orders the given catalog path entries by [[sessionFunctionResolutionOrder]], inserting + * system.session and system.builtin. Used by both the legacy single-schema resolution and + * by SET PATH's DEFAULT_PATH / SYSTEM_PATH expansion to keep ordering in sync. + * + * @param catalogEntries persistent catalog path entries (may be empty). + */ + def defaultPathOrder(catalogEntries: Seq[Seq[String]]): Seq[Seq[String]] = { val order = sessionFunctionResolutionOrder val session = Seq("system", "session") val builtin = Seq("system", "builtin") order match { case "first" => - if (catalogPath.isEmpty) Seq(session, builtin) - else Seq(session, builtin, catalogPath) + if (catalogEntries.isEmpty) Seq(session, builtin) + else Seq(session, builtin) ++ catalogEntries case "last" => - if (catalogPath.isEmpty) Seq(builtin, session) - else Seq(builtin, catalogPath, session) + if (catalogEntries.isEmpty) Seq(builtin, session) + else Seq(builtin) ++ catalogEntries ++ Seq(session) case _ => // "second" - if (catalogPath.isEmpty) Seq(builtin, session) - else Seq(builtin, session, catalogPath) + if (catalogEntries.isEmpty) Seq(builtin, session) + else Seq(builtin, session) ++ catalogEntries } } + /** System-only path (builtin + session) ordered by [[sessionFunctionResolutionOrder]]. */ + def systemPathOrder: Seq[Seq[String]] = defaultPathOrder(Seq.empty) + override def legacyParameterSubstitutionConstantsOnly: Boolean = getConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index be63063408346..abc62241a0066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -348,6 +348,27 @@ class SparkSqlAstBuilder extends AstBuilder { ResetCommand(Some(ctx.configKey().getText)) } + /** + * Create a [[SetPathCommand]] to set the session SQL path. + * Example SQL : + * {{{ + * SET PATH = spark_catalog.default, system.builtin; + * SET PATH = DEFAULT_PATH; + * SET PATH = SYSTEM_PATH, spark_catalog.myschema; + * }}} + */ + override def visitSetPath(ctx: SetPathContext): LogicalPlan = withOrigin(ctx) { + val elements = ctx.pathElement().asScala.map { pe => + if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath + else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath + else if (pe.PATH() != null) PathElement.PathRef + else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase + else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema + else PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier())) + }.toSeq + SetPathCommand(elements) + } + /** * Create a [[SetCommand]] logical plan to set [[SQLConf.SESSION_LOCAL_TIMEZONE]] * Example SQL : diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala new file mode 100644 index 0000000000000..70538160eefdb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.util.Locale + +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogManager.{ + CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry +} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Path element for SET PATH: either a well-known shortcut or a fully qualified schema reference. + * SchemaInPath requires at least 2 parts (catalog.namespace); multi-level namespaces are allowed. + */ +sealed trait PathElement + +object PathElement { + case object DefaultPath extends PathElement + case object SystemPath extends PathElement + case object PathRef extends PathElement + /** + * Current database/schema (SQL aliases). Stored as system.current_schema; expands when + * building resolution candidates so later USE SCHEMA is reflected. + */ + case object CurrentDatabase extends PathElement + case object CurrentSchema extends PathElement + /** Fully qualified schema reference (catalog.namespace...). Must have at least 2 parts. */ + case class SchemaInPath(parts: Seq[String]) extends PathElement +} + +/** + * Command for SET PATH = pathElement (, pathElement)* + * Expands shortcuts at run time, validates no duplicates, and sets the internal session path. + */ +case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableCommand { + + override def output: Seq[Attribute] = Seq.empty + + override def run(sparkSession: SparkSession): Seq[Row] = { + if (!sparkSession.sessionState.conf.pathEnabled) { + throw new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED", + messageParameters = Map("config" -> SQLConf.PATH_ENABLED.key)) + } + val conf = sparkSession.sessionState.conf + val catalogManager = sparkSession.sessionState.catalogManager + val currentCatalog = catalogManager.currentCatalog.name + val currentNamespace = catalogManager.currentNamespace.toSeq + val caseSensitive = conf.caseSensitiveAnalysis + + val expanded = expandPathElements(elements, conf, catalogManager) + val seen = new scala.collection.mutable.HashSet[Seq[String]] + expanded.foreach { entry => + val concrete = entry.resolve(currentCatalog, currentNamespace) + def normalize(s: String): String = if (caseSensitive) s else s.toLowerCase(Locale.ROOT) + val key = concrete.map(normalize) + if (!seen.add(key)) { + throw new AnalysisException( + errorClass = "DUPLICATE_SQL_PATH_ENTRY", + messageParameters = Map("pathEntry" -> + concrete.map(p => if (p.contains(".")) s"`$p`" else p).mkString("."))) + } + } + + if (expanded.isEmpty) { + catalogManager.clearSessionPath() + } else { + catalogManager.setSessionPath(expanded) + } + Seq.empty + } + + private def expandPathElements( + elements: Seq[PathElement], + conf: SQLConf, + catalogManager: CatalogManager): Seq[SessionPathEntry] = { + val currentSchemaSentinel = Seq("__current_schema__") + + def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map { + case p if p == currentSchemaSentinel => CurrentSchemaEntry + case p => LiteralPathEntry(p) + } + + def defaultWithCurrentSchema: Seq[SessionPathEntry] = + toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel))) + + elements.flatMap { + case PathElement.DefaultPath => + defaultWithCurrentSchema + case PathElement.SystemPath => + toEntries(conf.systemPathOrder) + case PathElement.CurrentDatabase | PathElement.CurrentSchema => + Seq(CurrentSchemaEntry) + case PathElement.PathRef => + catalogManager.sessionPathEntries.getOrElse(defaultWithCurrentSchema) + case PathElement.SchemaInPath(parts) => + if (parts.length < 2) { + throw QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString(".")) + } + Seq(LiteralPathEntry(parts)) + } + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 08dd212060762..9bd68cbe72a07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -160,7 +160,11 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) - protected lazy val catalogManager = new CatalogManager(v2SessionCatalog, catalog) + protected lazy val catalogManager = { + val cm = new CatalogManager(v2SessionCatalog, catalog) + parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager)) + cm + } protected lazy val sharedRelationCache = session.sharedState.relationCache diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala index b048e656334e9..39c38854f3718 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala @@ -1998,6 +1998,25 @@ class ParametersSuite extends QueryTest with SharedSparkSession { ) } + test("SET PATH with named parameter in IDENTIFIER (PATH feature enabled)") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + // current_path() resolves via system.builtin; include it when PATH is not DEFAULT_PATH. + spark.sql("SET PATH = spark_catalog.IDENTIFIER(:ns), system.builtin", Map("ns" -> "default")) + val pathStr = spark.sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), + s"SET PATH + IDENTIFIER(:ns); got: $pathStr") + } + } + + test("SET PATH with positional parameter in IDENTIFIER (PATH feature enabled)") { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true") { + spark.sql("SET PATH = spark_catalog.IDENTIFIER(?), system.builtin", Array("default")) + val pathStr = spark.sql("SELECT current_path()").collect().head.getString(0) + assert(pathStr.contains("spark_catalog") && pathStr.contains("default"), + s"SET PATH + IDENTIFIER(?); got: $pathStr") + } + } + test("IDENTIFIER clause with parameter marker - table reference") { // Test IDENTIFIER clause with parameter marker in table references withTable("test_table") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala new file mode 100644 index 0000000000000..d62827dea2371 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests for SET PATH command and session path management. + * Covers the feature flag, SET PATH syntax, CURRENT_PATH() reflecting stored path, + * DEFAULT_PATH, SYSTEM_PATH, CURRENT_SCHEMA/CURRENT_DATABASE expansion, + * PATH (append), duplicate detection, and error conditions. + * + * Resolution-level tests (tables/functions resolving via the stored path) + * belong in a separate suite once the resolution engine is wired. + */ +class SetPathSuite extends QueryTest with SharedSparkSession { + + private def withPathEnabled(f: => Unit): Unit = { + withSQLConf(SQLConf.PATH_ENABLED.key -> "true")(f) + } + + private def currentPath(): String = + sql("SELECT current_path()").collect().head.getString(0) + + private def pathEntries(pathStr: String): Seq[String] = + pathStr.split(",").map(_.trim).toSeq + + test("PATH disabled: CURRENT_PATH() returns default path") { + val entries = pathEntries(currentPath()) + assert(entries.contains("spark_catalog.default"), + s"Expected default path to contain spark_catalog.default, got: $entries") + assert(entries.exists(_.contains("builtin")), + s"Expected default path to contain builtin, got: $entries") + } + + test("PATH disabled: SET PATH is rejected") { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.other") + }, + condition = "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED", + sqlState = "0A000", + parameters = Map("config" -> SQLConf.PATH_ENABLED.key)) + } + + test("PATH enabled but no SET PATH: falls back to legacy resolutionSearchPath") { + withPathEnabled { + val entries = pathEntries(currentPath()) + assert(entries.exists(_.contains("builtin")), + s"Enabled-but-unset should fall back to legacy path with builtin, got: $entries") + assert(entries.exists(_.contains("default")), + s"Enabled-but-unset should include current schema, got: $entries") + } + } + + test("PATH enabled: DEFAULT_PATH + explicit builtin raises duplicate") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = DEFAULT_PATH, system.builtin") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "system.builtin")) + } + } + + test("PATH enabled: SET PATH and CURRENT_PATH()") { + withPathEnabled { + sql("SET PATH = spark_catalog.default, system.builtin") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"Expected exact path entries, got: $entries") + } + } + + test("PATH enabled: SET PATH = DEFAULT_PATH restores default") { + withPathEnabled { + sql("SET PATH = spark_catalog.default") + sql("SET PATH = DEFAULT_PATH") + val entries = pathEntries(currentPath()) + assert(entries.contains("spark_catalog.default"), + s"After SET PATH = DEFAULT_PATH expected current schema in path, got: $entries") + assert(entries.contains("system.builtin"), + s"After SET PATH = DEFAULT_PATH expected system.builtin, got: $entries") + assert(entries.contains("system.session"), + s"After SET PATH = DEFAULT_PATH expected system.session, got: $entries") + assert(entries.length === 3, + s"DEFAULT_PATH should expand to 3 entries, got: $entries") + } + } + + test("PATH enabled: DEFAULT_PATH composes with other path elements") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_extra_test") + try { + sql("SET PATH = DEFAULT_PATH, spark_catalog.path_extra_test") + val entries = pathEntries(currentPath()) + assert(entries.contains("system.builtin"), + s"DEFAULT_PATH should include system.builtin; got: $entries") + assert(entries.contains("system.session"), + s"DEFAULT_PATH should include system.session; got: $entries") + assert(entries.last === "spark_catalog.path_extra_test", + s"Extra schema should be appended after DEFAULT_PATH; got: $entries") + assert(entries.length === 4, + s"DEFAULT_PATH + 1 extra should be 4 entries, got: $entries") + } finally { + sql("DROP SCHEMA IF EXISTS path_extra_test") + } + } + } + + test("PATH enabled: DEFAULT_PATH, DEFAULT_PATH raises duplicate error") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = DEFAULT_PATH, DEFAULT_PATH") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "system.builtin")) + } + } + + test("programmatic SET of spark.sql.session.path has no effect on CURRENT_PATH()") { + withPathEnabled { + sql("SET PATH = spark_catalog.default, system.builtin") + spark.conf.set("spark.sql.session.path", "garbage") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"Programmatic SET should not affect path; got: $entries") + spark.conf.unset("spark.sql.session.path") + } + } + + test("PATH enabled: cloned session inherits parent path") { + withPathEnabled { + sql("SET PATH = spark_catalog.default, system.builtin") + val cloned = spark.cloneSession() + val clonedPath = cloned.sql("SELECT current_path()").collect().head.getString(0) + val entries = pathEntries(clonedPath) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"Cloned session should inherit parent path; got: $entries") + } + } + + test("PATH enabled: duplicate path entry raises error") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.default, spark_catalog.default") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: SET PATH = PATH, schema appends to path") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_append_test") + try { + sql("SET PATH = spark_catalog.default, system.builtin") + sql("SET PATH = PATH, spark_catalog.path_append_test") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin", + "spark_catalog.path_append_test"), + s"PATH, schema should append; got: $entries") + } finally { + sql("DROP SCHEMA IF EXISTS path_append_test") + } + } + } + + test("PATH enabled: SET PATH = CURRENT_SCHEMA / CURRENT_DATABASE") { + withPathEnabled { + sql("USE spark_catalog.default") + sql("SET PATH = current_schema, system.builtin") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "system.builtin"), + s"current_schema should expand to current schema, got: $entries") + sql("SET PATH = current_database, system.builtin") + val entries2 = pathEntries(currentPath()) + assert(entries2 === Seq("spark_catalog.default", "system.builtin"), + s"current_database should expand to current schema, got: $entries2") + } + } + + test("PATH enabled: cross-alias duplicate detection (current_database, current_schema)") { + withPathEnabled { + sql("USE spark_catalog.default") + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = current_database, current_schema") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: virtual CURRENT_SCHEMA expands to USE schema") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_virt_schema") + try { + sql("USE spark_catalog.path_virt_schema") + sql("SET PATH = current_schema, system.builtin") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.path_virt_schema", "system.builtin"), + s"CURRENT_SCHEMA in PATH should reflect USE; got: $entries") + } finally { + sql("USE spark_catalog.default") + sql("DROP SCHEMA IF EXISTS path_virt_schema") + } + } + } + + test("PATH enabled: duplicate after expanding CURRENT_SCHEMA") { + withPathEnabled { + sql("USE spark_catalog.default") + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.default, current_schema") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: duplicate when CURRENT_SCHEMA repeated") { + withPathEnabled { + sql("USE spark_catalog.default") + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = current_schema, current_schema") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: duplicate when SYSTEM_PATH listed twice") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = SYSTEM_PATH, SYSTEM_PATH") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "system.builtin")) + } + } + + test("PATH enabled: SET PATH = SYSTEM_PATH includes system.builtin and system.session") { + withPathEnabled { + sql("SET PATH = SYSTEM_PATH") + val entries = pathEntries(currentPath()) + assert(entries.contains("system.builtin"), + s"SYSTEM_PATH should include system.builtin; got: $entries") + assert(entries.contains("system.session"), + s"SYSTEM_PATH should include system.session; got: $entries") + } + } + + test("PATH enabled: SET PATH = PATH on unset session includes defaults") { + withPathEnabled { + sql("SET PATH = PATH, spark_catalog.extra") + val entries = pathEntries(currentPath()) + assert(entries.exists(_.contains("builtin")), + s"PATH on unset session should include builtin defaults; got: $entries") + assert(entries.contains("spark_catalog.extra"), + s"PATH on unset session should include appended schema; got: $entries") + } + } + + test("PATH enabled: SET PATH = PATH, schema after DEFAULT_PATH (empty session path)") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_from_empty") + try { + sql("SET PATH = DEFAULT_PATH") + sql("SET PATH = PATH, spark_catalog.path_from_empty") + val entries = pathEntries(currentPath()) + assert(entries.contains("spark_catalog.path_from_empty"), + s"PATH after cleared path should append schema; got: $entries") + } finally { + sql("DROP SCHEMA IF EXISTS path_from_empty") + } + } + } + + test("PATH enabled: unqualified (1-part) schema reference is rejected") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = myschema") + }, + condition = "INVALID_SQL_PATH_SCHEMA_REFERENCE", + parameters = Map( + "qualifiedName" -> "myschema")) + } + } + + test("PATH enabled: multi-level namespace (3+ parts) is accepted") { + withPathEnabled { + sql("SET PATH = iceberg_cat.db1.db2, spark_catalog.default") + val entries = pathEntries(currentPath()) + assert(entries.head === "iceberg_cat.db1.db2", + s"Multi-level namespace should be accepted; got: $entries") + } + } + + test("PATH enabled: backtick-quoted identifiers with dots round-trip correctly") { + withPathEnabled { + sql("SET PATH = `cat.a`.`sch.b`") + val entries = pathEntries(currentPath()) + assert(entries === Seq("`cat.a`.`sch.b`"), + s"Backtick-quoted identifiers should round-trip; got: $entries") + } + } + + test("PATH enabled: stored path preserves typed case") { + withPathEnabled { + sql("SET PATH = Spark_Catalog.Default, System.Builtin") + val entries = pathEntries(currentPath()) + assert(entries === Seq("Spark_Catalog.Default", "System.Builtin"), + s"Stored path should preserve case; got: $entries") + } + } + + test("PATH enabled: case-insensitive duplicate detection") { + withPathEnabled { + checkError( + exception = intercept[AnalysisException] { + sql("SET PATH = spark_catalog.DEFAULT, spark_catalog.default") + }, + condition = "DUPLICATE_SQL_PATH_ENTRY", + sqlState = Some("42732"), + parameters = Map("pathEntry" -> "spark_catalog.default")) + } + } + + test("PATH enabled: DEFAULT_PATH respects sessionFunctionResolutionOrder = first") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "first") { + sql("SET PATH = DEFAULT_PATH") + val entries = pathEntries(currentPath()) + assert(entries.head.contains("session"), + s"With 'first' order, session should come first; got: $entries") + } + } + + test("PATH enabled: DEFAULT_PATH respects sessionFunctionResolutionOrder = last") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "last") { + sql("SET PATH = DEFAULT_PATH") + val entries = pathEntries(currentPath()) + assert(entries.last.contains("session"), + s"With 'last' order, session should come last; got: $entries") + } + } + + // TODO: cloneSession() constructs a new CatalogManager per forked session and + // explicitly copies only the stored session path via copySessionPathFrom. + // Other CatalogManager state propagation (current catalog/namespace, registered + // catalogs) on clone is currently incidental — audit and pin down the intended + // semantics in a follow-up. +}