Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2061,6 +2061,12 @@
],
"sqlState" : "42711"
},
"DUPLICATE_SQL_PATH_ENTRY" : {
"message" : [
"Duplicate SQL path entry <pathEntry>. The session SQL path cannot contain the same namespace more than once (including after expanding shortcuts like DEFAULT_PATH or SYSTEM_PATH)."
],
"sqlState" : "42732"
},
"DUPLICATE_VARIABLE_NAME_INSIDE_DECLARE" : {
"message" : [
"Found duplicate variable <variableName> in the declare variable list. Please, remove one of them."
Expand Down Expand Up @@ -4475,6 +4481,12 @@
],
"sqlState" : "XXKD0"
},
"INVALID_SQL_PATH_SCHEMA_REFERENCE" : {
"message" : [
"Invalid schema reference in SET PATH: <qualifiedName>. Use at least two name parts (catalog.schema); multi-level namespaces are allowed."
],
"sqlState" : "42601"
},
"INVALID_SQL_SYNTAX" : {
"message" : [
"Invalid SQL syntax:"
Expand Down Expand Up @@ -7884,6 +7896,11 @@
"Cannot have VARIANT type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is <dataType>."
]
},
"SET_PATH_WHEN_DISABLED" : {
"message" : [
"SET PATH is disabled. Set <config> to true to enable it."
]
},
"SET_PROPERTIES_AND_DBPROPERTIES" : {
"message" : [
"set PROPERTIES and DBPROPERTIES at the same time."
Expand Down
2 changes: 1 addition & 1 deletion common/utils/src/main/resources/error/error-states.json
Original file line number Diff line number Diff line change
Expand Up @@ -3339,7 +3339,7 @@
"description": "A duplicate schema name in a special register was detected.",
"origin": "DB2",
"standard": "N",
"usedBy": ["DB2"]
"usedBy": ["DB2", "Spark"]
},
"42734": {
"description": "A duplicate parameter-name, SQL variable name, label, or condition-name was detected.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ setResetStatement
| SET TIME ZONE interval #setTimeZone
| SET TIME ZONE timezone #setTimeZone
| SET TIME ZONE .*? #setTimeZone
| SET PATH EQ pathElement (COMMA pathElement)* #setPath
| SET variable assignmentList #setVariable
| SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
LEFT_PAREN query RIGHT_PAREN #setVariable
Expand All @@ -459,6 +460,15 @@ setResetStatement
| RESET .*? #resetConfiguration
;

pathElement
: DEFAULT_PATH
| SYSTEM_PATH
| PATH
| CURRENT_DATABASE
| CURRENT_SCHEMA
| multipartIdentifier
;

executeImmediate
: EXECUTE IMMEDIATE queryParam=expression (INTO targetVariable=multipartIdentifierList)? executeImmediateUsing?
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, con
import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.{instantToNanosOfDay, truncateTimeToPrecision}
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -156,13 +155,9 @@ case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[Logic
def apply(plan: LogicalPlan): LogicalPlan = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
lazy val currentNamespace = catalogManager.currentNamespace.quoted
lazy val currentNamespaceSeq = catalogManager.currentNamespace.toSeq
lazy val currentCatalog = catalogManager.currentCatalog.name()
lazy val currentUser = CurrentUserContext.getCurrentUser
lazy val currentPathStr = {
val catalogPath = (currentCatalog +: currentNamespaceSeq).toSeq
SQLConf.get.resolutionSearchPath(catalogPath).map(_.quoted).mkString(",")
}
lazy val currentPathStr = catalogManager.currentPathString

plan.transformAllExpressionsWithPruning(_.containsPattern(CURRENT_LIKE)) {
case CurrentDatabase() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,44 @@ class CatalogManager(
_currentNamespace = Some(namespace)
}

import CatalogManager.SessionPathEntry

private var _sessionPath: Option[Seq[SessionPathEntry]] = None

/** Returns the raw stored session path entries, or None if no path is set. */
def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { _sessionPath }

def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
_sessionPath = Some(entries)
}

def clearSessionPath(): Unit = synchronized {
_sessionPath = None
}

private[sql] def copySessionPathFrom(other: CatalogManager): Unit = synchronized {
_sessionPath = other.sessionPathEntries
}

/**
* String form of the current resolution path for CURRENT_PATH().
* When PATH is enabled and a session path is stored, formats the effective path entries
* with markers expanded. Otherwise falls back to the legacy resolutionSearchPath.
*/
def currentPathString: String = synchronized {
import CatalogV2Implicits._
val stored = if (conf.pathEnabled) _sessionPath else None
stored match {
case Some(entries) =>
val resolved = CatalogManager.resolvePathEntries(
entries, currentCatalog.name(), currentNamespace.toSeq)
resolved.map(_.quoted).mkString(",")
case None =>
val catalogPath = (currentCatalog.name() +: currentNamespace).toSeq
conf.resolutionSearchPath(catalogPath).map(_.quoted).mkString(",")
}
}

private var _currentCatalogName: Option[String] = None

def currentCatalog: CatalogPlugin = synchronized {
Expand Down Expand Up @@ -154,6 +192,7 @@ class CatalogManager(
catalogs.clear()
_currentNamespace = None
_currentCatalogName = None
_sessionPath = None
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}
Expand Down Expand Up @@ -204,4 +243,33 @@ private[sql] object CatalogManager {
(nameParts.length == 2 && nameParts.head.equalsIgnoreCase(SESSION_NAMESPACE)) ||
isFullyQualifiedSystemSessionViewName(nameParts)
}

/**
* A single entry in the session SQL path: either a literal schema
* or the current-schema marker.
*/
sealed trait SessionPathEntry {
/** Resolve to concrete catalog + namespace parts. */
def resolve(
currentCatalog: String,
currentNamespace: Seq[String]): Seq[String] = this match {
case CurrentSchemaEntry =>
if (currentNamespace.isEmpty) Seq(currentCatalog)
else currentCatalog +: currentNamespace
case LiteralPathEntry(parts) => parts
}
}

/** Marker for CURRENT_SCHEMA / CURRENT_DATABASE: expands dynamically with USE SCHEMA. */
case object CurrentSchemaEntry extends SessionPathEntry

/** A fully qualified schema reference (catalog.namespace...). */
case class LiteralPathEntry(parts: Seq[String]) extends SessionPathEntry

/** Resolve all entries in a session path to concrete catalog + namespace parts. */
def resolvePathEntries(
entries: Seq[SessionPathEntry],
currentCatalog: String,
currentNamespace: Seq[String]): Seq[Seq[String]] =
entries.map(_.resolve(currentCatalog, currentNamespace))
}
Original file line number Diff line number Diff line change
Expand Up @@ -954,9 +954,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
*
* @param quotedResolutionPathEntries each string is already a display-ready path entry (typically
* `toSQLId` of each segment from `SQLConf.resolutionSearchPath`). They are joined with
* ", " inside square brackets. This differs from [[noSuchTableError]](nameParts:
* Seq[String]), which builds one dotted bracketed path from `nameParts.dropRight(1)` via
* [[org.apache.spark.sql.catalyst.analysis.NoSuchTableException]].
* ", " inside square brackets. This differs from
* [[org.apache.spark.sql.catalyst.analysis.NoSuchItemExceptionHelper.formatSearchPath]],
* which builds one dotted bracketed path from a single search path.
*/
def tableOrViewNotFoundWithSearchPath(
name: Seq[String],
Expand Down Expand Up @@ -2458,6 +2458,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
"config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key))
}

def invalidSqlPathSchemaReferenceError(qualifiedName: String): Throwable = {
new AnalysisException(
errorClass = "INVALID_SQL_PATH_SCHEMA_REFERENCE",
messageParameters = Map("qualifiedName" -> qualifiedName))
}

def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = {
DataTypeErrors.userSpecifiedSchemaUnsupportedError(operation)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2439,6 +2439,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val PATH_ENABLED =
buildConf("spark.sql.path.enabled")
.version("4.2.0")
.doc("When true, enables the SQL Standard PATH feature: SET PATH, path-based routine " +
"resolution, and CURRENT_PATH(). When false, SET PATH is rejected and resolution uses " +
"the default path only.")
Comment thread
srielau marked this conversation as resolved.
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

// Whether to retain group by columns or not in GroupedData.agg.
val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns")
.version("1.4.0")
Expand Down Expand Up @@ -8352,29 +8362,44 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
*/
def prioritizeSystemCatalog: Boolean = !getConf(SQLConf.PERSISTENT_CATALOG_FIRST)

def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED)

/**
* Returns the resolution search path for error messages and resolution order.
* This is the single source of truth for the search path used for functions, tables, and views.
* Uses [[sessionFunctionResolutionOrder]]: "first" (session first), "second" (session second),
* "last" (session last). When catalogPath is empty, returns only system namespaces.
*/
def resolutionSearchPath(catalogPath: Seq[String]): Seq[Seq[String]] = {
def resolutionSearchPath(catalogPath: Seq[String]): Seq[Seq[String]] =
defaultPathOrder(if (catalogPath.isEmpty) Seq.empty else Seq(catalogPath))

/**
* Orders the given catalog path entries by [[sessionFunctionResolutionOrder]], inserting
* system.session and system.builtin. Used by both the legacy single-schema resolution and
* by SET PATH's DEFAULT_PATH / SYSTEM_PATH expansion to keep ordering in sync.
*
* @param catalogEntries persistent catalog path entries (may be empty).
*/
def defaultPathOrder(catalogEntries: Seq[Seq[String]]): Seq[Seq[String]] = {
val order = sessionFunctionResolutionOrder
val session = Seq("system", "session")
val builtin = Seq("system", "builtin")
order match {
case "first" =>
if (catalogPath.isEmpty) Seq(session, builtin)
else Seq(session, builtin, catalogPath)
if (catalogEntries.isEmpty) Seq(session, builtin)
else Seq(session, builtin) ++ catalogEntries
case "last" =>
if (catalogPath.isEmpty) Seq(builtin, session)
else Seq(builtin, catalogPath, session)
if (catalogEntries.isEmpty) Seq(builtin, session)
else Seq(builtin) ++ catalogEntries ++ Seq(session)
case _ => // "second"
if (catalogPath.isEmpty) Seq(builtin, session)
else Seq(builtin, session, catalogPath)
if (catalogEntries.isEmpty) Seq(builtin, session)
else Seq(builtin, session) ++ catalogEntries
}
}

/** System-only path (builtin + session) ordered by [[sessionFunctionResolutionOrder]]. */
def systemPathOrder: Seq[Seq[String]] = defaultPathOrder(Seq.empty)

override def legacyParameterSubstitutionConstantsOnly: Boolean =
getConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,27 @@ class SparkSqlAstBuilder extends AstBuilder {
ResetCommand(Some(ctx.configKey().getText))
}

/**
* Create a [[SetPathCommand]] to set the session SQL path.
* Example SQL :
* {{{
* SET PATH = spark_catalog.default, system.builtin;
* SET PATH = DEFAULT_PATH;
* SET PATH = SYSTEM_PATH, spark_catalog.myschema;
* }}}
*/
override def visitSetPath(ctx: SetPathContext): LogicalPlan = withOrigin(ctx) {
Comment thread
srielau marked this conversation as resolved.
val elements = ctx.pathElement().asScala.map { pe =>
if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath
else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath
else if (pe.PATH() != null) PathElement.PathRef
else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase
else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema
else PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier()))
}.toSeq
SetPathCommand(elements)
}

/**
* Create a [[SetCommand]] logical plan to set [[SQLConf.SESSION_LOCAL_TIMEZONE]]
* Example SQL :
Expand Down
Loading