From 28ce116e2cdd35333aae6f58ed579d18d1989597 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 22 Nov 2021 18:13:35 -0800 Subject: [PATCH 1/3] initial commit --- .../spark/sql/catalyst/parser/AstBuilder.scala | 4 ++-- .../sql/catalyst/plans/logical/v2Commands.scala | 4 +++- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 9 +++++---- .../org/apache/spark/sql/execution/command/ddl.scala | 5 +++-- .../datasources/v2/DataSourceV2Strategy.scala | 3 ++- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 12 ++++++++++++ 6 files changed, 27 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 754f92b8e05d9..6eeec87a5b3b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogUtils} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ @@ -3132,7 +3132,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg withOrigin(ctx) { SetNamespaceLocation( UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier)), - visitLocationSpec(ctx.locationSpec)) + CatalogUtils.stringToURI(visitLocationSpec(ctx.locationSpec))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 4ed5d87aaf102..59bd7733b5d95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical +import java.net.URI + import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource @@ -349,7 +351,7 @@ case class SetNamespaceProperties( */ case class SetNamespaceLocation( namespace: LogicalPlan, - location: String) extends UnaryCommand { + location: URI) extends UnaryCommand { override def child: LogicalPlan = namespace override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceLocation = copy(namespace = newChild) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 62b611a1285b4..7228bca1c0e42 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -23,7 +23,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first} @@ -1833,20 +1833,21 @@ class DDLParserSuite extends AnalysisTest { } test("set namespace location") { + val loc = CatalogUtils.stringToURI("/home/user/db") comparePlans( parsePlan("ALTER DATABASE a.b.c SET LOCATION '/home/user/db'"), SetNamespaceLocation( - UnresolvedNamespace(Seq("a", "b", "c")), "/home/user/db")) + UnresolvedNamespace(Seq("a", "b", "c")), loc)) comparePlans( parsePlan("ALTER SCHEMA a.b.c SET LOCATION '/home/user/db'"), SetNamespaceLocation( - UnresolvedNamespace(Seq("a", "b", "c")), "/home/user/db")) + UnresolvedNamespace(Seq("a", "b", "c")), loc)) comparePlans( parsePlan("ALTER NAMESPACE a.b.c SET LOCATION '/home/user/db'"), SetNamespaceLocation( - UnresolvedNamespace(Seq("a", "b", "c")), "/home/user/db")) + UnresolvedNamespace(Seq("a", "b", "c")), loc)) } test("analyze table statistics") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 295838eda5a72..2c57db24b7cff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command +import java.net.URI import java.util.Locale import java.util.concurrent.TimeUnit._ @@ -147,13 +148,13 @@ case class AlterDatabasePropertiesCommand( * ALTER (DATABASE|SCHEMA) database_name SET LOCATION path * }}} */ -case class AlterDatabaseSetLocationCommand(databaseName: String, location: String) +case class AlterDatabaseSetLocationCommand(databaseName: String, location: URI) extends LeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val oldDb = catalog.getDatabaseMetadata(databaseName) - catalog.alterDatabase(oldDb.copy(locationUri = CatalogUtils.stringToURI(location))) + catalog.alterDatabase(oldDb.copy(locationUri = location)) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index cbfeaa4f5d651..299aff4ec14cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, EmptyRow, Expression, Literal, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -314,7 +315,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterNamespaceSetPropertiesExec( catalog.asNamespaceCatalog, ns, - Map(SupportsNamespaces.PROP_LOCATION -> location)) :: Nil + Map(SupportsNamespaces.PROP_LOCATION -> CatalogUtils.URIToString(location))) :: Nil case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) => AlterNamespaceSetPropertiesExec( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 6cbbf680bc9ca..de4dc897980de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1301,6 +1301,18 @@ class DataSourceV2SQLSuite } } + test("SPARK-37444: ALTER NAMESPACE .. SET LOCATION using v2 catalog with empty location") { + val ns = "testcat.ns1.ns2" + withNamespace(ns) { + sql(s"CREATE NAMESPACE IF NOT EXISTS $ns COMMENT " + + "'test namespace' LOCATION '/tmp/ns_test_1'") + val e = intercept[IllegalArgumentException] { + sql(s"ALTER DATABASE $ns SET LOCATION ''") + } + assert(e.getMessage.contains("Can not create a Path from an empty string")) + } + } + private def testShowNamespaces( sqlText: String, expected: Seq[String]): Unit = { From 8c0a8c937607c68b6e0019259794da4495b1001c Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 23 Nov 2021 21:50:43 -0800 Subject: [PATCH 2/3] address pr comment --- .../catalog/ExternalCatalogUtils.scala | 19 +++++++++++++++++++ .../sql/catalyst/catalog/SessionCatalog.scala | 11 ++--------- .../sql/catalyst/parser/AstBuilder.scala | 4 ++-- .../catalyst/plans/logical/v2Commands.scala | 4 +--- .../sql/catalyst/parser/DDLParserSuite.scala | 9 ++++----- .../spark/sql/execution/command/ddl.scala | 5 ++--- .../datasources/v2/DataSourceV2Strategy.scala | 9 ++++++--- .../sql/connector/DataSourceV2SQLSuite.scala | 2 +- 8 files changed, 37 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index a88b509a9615a..4b0e676d720cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell @@ -258,6 +259,24 @@ object CatalogUtils { new Path(str).toUri } + def makeQualifiedNamespacePath( + locationUri: URI, + warehousePath: String, + hadoopConf: Configuration): URI = { + if (locationUri.isAbsolute) { + locationUri + } else { + val fullPath = new Path(warehousePath, CatalogUtils.URIToString(locationUri)) + makeQualifiedPath(fullPath.toUri, hadoopConf) + } + } + + def makeQualifiedPath(path: URI, hadoopConf: Configuration): URI = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(hadoopConf) + fs.makeQualified(hadoopPath).toUri + } + private def normalizeColumnName( tableName: String, tableCols: Seq[String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f529b13ff5adc..610a683e86246 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -210,9 +210,7 @@ class SessionCatalog( * FileSystem is changed. */ private def makeQualifiedPath(path: URI): URI = { - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(hadoopConf) - fs.makeQualified(hadoopPath).toUri + CatalogUtils.makeQualifiedPath(path, hadoopConf) } private def requireDbExists(db: String): Unit = { @@ -254,12 +252,7 @@ class SessionCatalog( } private def makeQualifiedDBPath(locationUri: URI): URI = { - if (locationUri.isAbsolute) { - locationUri - } else { - val fullPath = new Path(conf.warehousePath, CatalogUtils.URIToString(locationUri)) - makeQualifiedPath(fullPath.toUri) - } + CatalogUtils.makeQualifiedNamespacePath(locationUri, conf.warehousePath, hadoopConf) } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 6eeec87a5b3b0..754f92b8e05d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ @@ -3132,7 +3132,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg withOrigin(ctx) { SetNamespaceLocation( UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier)), - CatalogUtils.stringToURI(visitLocationSpec(ctx.locationSpec))) + visitLocationSpec(ctx.locationSpec)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 59bd7733b5d95..4ed5d87aaf102 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.plans.logical -import java.net.URI - import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource @@ -351,7 +349,7 @@ case class SetNamespaceProperties( */ case class SetNamespaceLocation( namespace: LogicalPlan, - location: URI) extends UnaryCommand { + location: String) extends UnaryCommand { override def child: LogicalPlan = namespace override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceLocation = copy(namespace = newChild) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 7228bca1c0e42..62b611a1285b4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -23,7 +23,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first} @@ -1833,21 +1833,20 @@ class DDLParserSuite extends AnalysisTest { } test("set namespace location") { - val loc = CatalogUtils.stringToURI("/home/user/db") comparePlans( parsePlan("ALTER DATABASE a.b.c SET LOCATION '/home/user/db'"), SetNamespaceLocation( - UnresolvedNamespace(Seq("a", "b", "c")), loc)) + UnresolvedNamespace(Seq("a", "b", "c")), "/home/user/db")) comparePlans( parsePlan("ALTER SCHEMA a.b.c SET LOCATION '/home/user/db'"), SetNamespaceLocation( - UnresolvedNamespace(Seq("a", "b", "c")), loc)) + UnresolvedNamespace(Seq("a", "b", "c")), "/home/user/db")) comparePlans( parsePlan("ALTER NAMESPACE a.b.c SET LOCATION '/home/user/db'"), SetNamespaceLocation( - UnresolvedNamespace(Seq("a", "b", "c")), loc)) + UnresolvedNamespace(Seq("a", "b", "c")), "/home/user/db")) } test("analyze table statistics") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 2c57db24b7cff..295838eda5a72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.command -import java.net.URI import java.util.Locale import java.util.concurrent.TimeUnit._ @@ -148,13 +147,13 @@ case class AlterDatabasePropertiesCommand( * ALTER (DATABASE|SCHEMA) database_name SET LOCATION path * }}} */ -case class AlterDatabaseSetLocationCommand(databaseName: String, location: URI) +case class AlterDatabaseSetLocationCommand(databaseName: String, location: String) extends LeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val oldDb = catalog.getDatabaseMetadata(databaseName) - catalog.alterDatabase(oldDb.copy(locationUri = location)) + catalog.alterDatabase(oldDb.copy(locationUri = CatalogUtils.stringToURI(location))) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 299aff4ec14cd..507ea078dc235 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -21,9 +21,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.{SparkSession, Strategy} +import org.apache.spark.sql.catalyst.{expressions, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} import org.apache.spark.sql.catalyst.catalog.CatalogUtils -import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, EmptyRow, Expression, Literal, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ @@ -45,7 +45,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String -class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy + with PredicateHelper with SQLConfHelper { import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -312,10 +313,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil case SetNamespaceLocation(ResolvedNamespace(catalog, ns), location) => + val nsPath = CatalogUtils.makeQualifiedNamespacePath( + CatalogUtils.stringToURI(location), conf.warehousePath, session.sharedState.hadoopConf) AlterNamespaceSetPropertiesExec( catalog.asNamespaceCatalog, ns, - Map(SupportsNamespaces.PROP_LOCATION -> CatalogUtils.URIToString(location))) :: Nil + Map(SupportsNamespaces.PROP_LOCATION -> CatalogUtils.URIToString(nsPath))) :: Nil case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) => AlterNamespaceSetPropertiesExec( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index de4dc897980de..c6c05f109c299 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1294,7 +1294,7 @@ class DataSourceV2SQLSuite assert(descriptionDf.collect() === Seq( Row("Namespace Name", "ns2"), Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"), - Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test_2"), + Row(SupportsNamespaces.PROP_LOCATION.capitalize, "file:/tmp/ns_test_2"), Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser), Row("Properties", "")) ) From 068473fcd31879e08a87de8028663f8891d60920 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 24 Nov 2021 00:45:13 -0800 Subject: [PATCH 3/3] address PR comment --- .../execution/datasources/v2/DataSourceV2Strategy.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 507ea078dc235..9bf273ce16c5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -21,9 +21,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.{SparkSession, Strategy} -import org.apache.spark.sql.catalyst.{expressions, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} import org.apache.spark.sql.catalyst.catalog.CatalogUtils +import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, EmptyRow, Expression, Literal, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ @@ -39,14 +39,14 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PushableColumn, PushableColumnBase} import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.types.{BooleanType, StringType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String -class DataSourceV2Strategy(session: SparkSession) extends Strategy - with PredicateHelper with SQLConfHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -313,8 +313,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil case SetNamespaceLocation(ResolvedNamespace(catalog, ns), location) => + val warehousePath = session.sharedState.conf.get(WAREHOUSE_PATH) val nsPath = CatalogUtils.makeQualifiedNamespacePath( - CatalogUtils.stringToURI(location), conf.warehousePath, session.sharedState.hadoopConf) + CatalogUtils.stringToURI(location), warehousePath, session.sharedState.hadoopConf) AlterNamespaceSetPropertiesExec( catalog.asNamespaceCatalog, ns,