Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,21 @@ case class SQLFunction(
props.put(CREATE_TIME, createTimeMs.toString)
props.toMap
}

/** Frozen PATH string persisted when the function was created with SQL PATH enabled. */
def functionStoredResolutionPath: Option[String] =
properties.get(SQLFunction.FUNCTION_RESOLUTION_PATH)
}

object SQLFunction {

/**
* Persisted frozen PATH for SQL function bodies when created with [[SQLConf.PATH_ENABLED]].
* Serialized as a JSON array of path entries (same format as
* [[CatalogTable.VIEW_RESOLUTION_PATH]]).
*/
val FUNCTION_RESOLUTION_PATH: String = "function.resolutionPath"

private val SQL_FUNCTION_PREFIX = "sqlFunction."

private val INPUT_PARAM: String = SQL_FUNCTION_PREFIX + "inputParam"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.catalog

import scala.util.Try

import org.json4s.JsonAST.{JArray, JObject, JString, JValue}
import org.json4s.jackson.JsonMethods.parse

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

/**
* Formatting helpers for the SQL Path stored in view and SQL function
* metadata. The on-disk property stores path entries as a JSON array
* of arrays:
* {{{
* [["spark_catalog","default"],["system","builtin"]]
* }}}
* `toDescribeJson` converts these to the object form used by
* `DESCRIBE AS JSON`:
* {{{
* {"catalog_name": "spark_catalog", "namespace": ["default"]}
* }}}
* This supports multi-level namespaces.
*/
private[sql] object SqlPathFormat {

/**
* Build a JSON value for DESCRIBE AS JSON from a stored resolution
* path string (JSON array of arrays persisted in the property).
*/
def toDescribeJson(storedPathStr: String): Option[JValue] = {
Try(parse(storedPathStr)) match {
case scala.util.Success(JArray(entries)) if entries.nonEmpty =>
val converted = entries.flatMap {
case JArray(parts) =>
val partStrs = parts.collect { case JString(s) => s }
if (partStrs.isEmpty) None
else Some(JObject(
"catalog_name" -> JString(partStrs.head),
"namespace" -> JArray(
partStrs.tail.map(JString).toList)))
case _ => None
}
if (converted.nonEmpty) Some(JArray(converted)) else None
case _ => None
}
}

/**
* Format a JSON path value (array of objects with catalog_name and
* namespace) as a human-readable string for DESCRIBE EXTENDED.
* Example: `` `spark_catalog`.`default`, `system`.`builtin` ``
*/
def formatForDisplay(jValue: JValue): Option[String] = {
jValue match {
case JArray(entries) =>
Some(entries.map {
case JObject(fields) =>
val m = fields.toMap
val cat = m.get("catalog_name")
.map(_.values.toString).getOrElse("")
val ns = m.get("namespace") match {
case Some(JArray(parts)) =>
parts.map(_.values.toString)
case _ => Nil
}
val parts = (cat +: ns).filter(_.nonEmpty)
if (parts.nonEmpty) parts.quoted else ""
case _ => ""
}.mkString(", "))
case _ => Some(jValue.values.toString)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ trait MetadataMapSupport {
case JLong(value) => Some(new Date(value).toString)
case _ => Some(jValue.values.toString)
}
case "SQL Path" => SqlPathFormat.formatForDisplay(jValue)
case _ => None
}
reformattedValue.map(value => key -> value)
Expand Down Expand Up @@ -610,6 +611,15 @@ case class CatalogTable(
}
}

/**
* Frozen SQL PATH stored when the view was created with [[SQLConf.PATH_ENABLED]].
* Serialized as a JSON array of path entries (each entry an array of identifier parts);
* virtual markers (e.g. `system.current_schema`) are materialized and, for persisted
* views, `system.session` is omitted.
*/
def viewStoredResolutionPath: Option[String] =
properties.get(CatalogTable.VIEW_RESOLUTION_PATH)

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[URI] = storage.locationUri,
Expand Down Expand Up @@ -675,6 +685,13 @@ case class CatalogTable(
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
map += "View Catalog and Namespace" -> JString(viewCatalogAndNamespaceInfos.quoted)
}
if (SQLConf.get.pathEnabled) {
viewStoredResolutionPath.foreach { pathStr =>
SqlPathFormat.toDescribeJson(pathStr).foreach { json =>
map += "SQL Path" -> json
}
}
}
val viewQueryOutputColumns: JValue = Try {
if (viewSchemaMode == SchemaEvolution) {
JArray(schema.map(_.name).map(JString).toList)
Expand Down Expand Up @@ -765,6 +782,9 @@ object CatalogTable {

val VIEW_SCHEMA_MODE = VIEW_PREFIX + "schemaMode"

/** Frozen expanded PATH at view creation (PATH feature); not a SQL config property. */
val VIEW_RESOLUTION_PATH = VIEW_PREFIX + "resolutionPath"

val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"

val PROP_CLUSTERING_COLUMNS: String = "clusteringColumns"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ private[sql] object CatalogManager {
isFullyQualifiedSystemSessionViewName(nameParts)
}

/** True if a SQL path entry is the well-known `system.session` entry. */
def isSystemSessionPathEntry(parts: Seq[String]): Boolean =
parts == Seq(SYSTEM_CATALOG_NAME, SESSION_NAMESPACE)

/**
* A single entry in the session SQL path: either a literal schema
* or the current-schema marker.
Expand Down Expand Up @@ -272,4 +276,40 @@ private[sql] object CatalogManager {
currentCatalog: String,
currentNamespace: Seq[String]): Seq[Seq[String]] =
entries.map(_.resolve(currentCatalog, currentNamespace))

/**
* Compute the resolved path entries to persist in view or SQL function metadata.
* When PATH is enabled, resolves the stored session path (or falls back to the
* legacy resolutionSearchPath). If `stripSession` is true, removes `system.session`
* entries (persisted objects cannot reference temporary objects).
*/
def pathEntriesForPersistence(
catalogManager: CatalogManager,
conf: SQLConf,
stripSession: Boolean): Seq[Seq[String]] = {
if (!conf.pathEnabled) return Seq.empty
val currentCatalog = catalogManager.currentCatalog.name()
val currentNamespace = catalogManager.currentNamespace.toSeq
val entries = catalogManager.sessionPathEntries match {
case Some(stored) =>
resolvePathEntries(stored, currentCatalog, currentNamespace)
case None =>
val catalogPath =
(currentCatalog +: currentNamespace).toSeq
conf.resolutionSearchPath(catalogPath)
}
if (stripSession) {
entries.filterNot(isSystemSessionPathEntry)
} else {
entries
}
}

/** Serialize resolved path entries to JSON for storage in view/function properties. */
def serializePathEntries(entries: Seq[Seq[String]]): String = {
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods.compact
compact(JArray(entries.map(parts =>
JArray(parts.map(JString(_)).toList)).toList))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{LateralJoin, LocalRelation, LogicalPlan, OneRowRelation, Project, Range, UnresolvedWith, View}
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.CreateUserDefinedFunctionCommand._
Expand Down Expand Up @@ -512,10 +513,23 @@ case class CreateSQLFunctionCommand(
}
val tempVars = ViewHelper.collectTemporaryVariables(analyzed)

// Capture the effective resolution path at function creation time so the function
// body resolves with the same path regardless of the caller's session path later.
val expandedPathEntries = CatalogManager.pathEntriesForPersistence(
manager, conf, stripSession = !isTemp)
val resolutionPathProps =
if (expandedPathEntries.nonEmpty) {
Map(SQLFunction.FUNCTION_RESOLUTION_PATH ->
CatalogManager.serializePathEntries(expandedPathEntries))
} else {
Map.empty[String, String]
}

sqlConfigsToProps(conf, SQL_CONFIG_PREFIX) ++
catalogAndNamespaceToProps(
manager.currentCatalog.name,
manager.currentNamespace.toIndexedSeq) ++
referredTempNamesToProps(tempViews, tempFunctions, tempVars)
referredTempNamesToProps(tempViews, tempFunctions, tempVars) ++
resolutionPathProps
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import java.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.catalog.{SQLFunction, SqlPathFormat, UserDefinedFunction}
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo

/**
* Helpers for [[DescribeFunctionCommand]] to retrieve and format
* the frozen SQL PATH stored in SQL function metadata.
*/
private[command] object DescribeFunctionCommandUtils {

/**
* Returns the frozen SQL PATH persisted for a SQL function, formatted
* for display. Persistent functions: loads [[CatalogFunction]] metadata
* from the catalog. Temporary SQL UDFs (not in catalog): falls back to
* parsing the usage JSON blob produced by [[SQLFunction.toExpressionInfo]].
*/
private[command] def storedResolutionPathString(
sparkSession: SparkSession,
identifier: FunctionIdentifier,
info: ExpressionInfo): Option[String] = {
val rawJson = try {
val meta = sparkSession.sessionState.catalog
.getFunctionMetadata(identifier)
if (meta.isUserDefinedFunction) {
val udf = UserDefinedFunction.fromCatalogFunction(
meta,
sparkSession.sessionState.sqlParser)
udf.asInstanceOf[SQLFunction].functionStoredResolutionPath
} else {
None
}
} catch {
case _: org.apache.spark.sql.catalyst.analysis
.NoSuchFunctionException |
_: org.apache.spark.sql.catalyst.analysis
.NoSuchDatabaseException =>
extractResolutionPathFromSqlUdfUsage(info.getUsage)
}
rawJson.flatMap(formatStoredPath)
}

private def formatStoredPath(pathStr: String): Option[String] = {
Comment thread
srielau marked this conversation as resolved.
SqlPathFormat.toDescribeJson(pathStr)
.flatMap(SqlPathFormat.formatForDisplay)
}

/**
* For temporary SQL UDFs not in the catalog, the resolution path may
* be embedded in the ExpressionInfo usage JSON blob. Returns None if
* the usage string is not JSON or does not contain the path key.
*/
private def extractResolutionPathFromSqlUdfUsage(
usage: String): Option[String] = {
if (usage == null || usage.isEmpty) return None
try {
val map = UserDefinedFunction.mapper.readValue(
usage, classOf[util.HashMap[String, String]])
Option(map.get(SQLFunction.FUNCTION_RESOLUTION_PATH))
.filter(_.nonEmpty)
} catch {
case e: com.fasterxml.jackson.core.JsonProcessingException =>
throw new org.apache.spark.SparkException(
s"Corrupted SQL UDF metadata: expected JSON usage blob " +
s"but failed to parse: ${e.getMessage}", e)
Comment thread
srielau marked this conversation as resolved.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, SQLFunction}
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.StringUtils
Expand Down Expand Up @@ -117,15 +117,26 @@ case class DescribeFunctionCommand(
Row(s"Function: $name") :: Row(s"Usage: ${info.getUsage}") :: Nil
}

val sqlPathRows =
if (isExtended &&
sparkSession.sessionState.conf.pathEnabled &&
SQLFunction.isSQLFunction(info.getClassName)) {
DescribeFunctionCommandUtils
.storedResolutionPathString(sparkSession, identifier, info)
.map(s => Seq(Row(s"SQL Path: $s")))
.getOrElse(Nil)
} else {
Nil
}

if (isExtended) {
result :+ Row(s"Extended Usage:${info.getExtended}")
(result ++ sqlPathRows) :+ Row(s"Extended Usage:${info.getExtended}")
} else {
result
}
}
}


/**
* The DDL command that drops a function.
* ifExists: returns an error if the function doesn't exist, unless this is true.
Expand Down
Loading