From e3c59a0dd314acf5501ef9e472e01be732152c52 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 02:54:39 -0700 Subject: [PATCH 1/8] [SPARK-16458][SQL] SessionCatalog should support `listColumns` for temporary tables --- .../sql/catalyst/catalog/SessionCatalog.scala | 6 +++++- .../apache/spark/sql/internal/CatalogImpl.scala | 16 +++++++++++++++- .../apache/spark/sql/internal/CatalogSuite.scala | 5 +++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index d88b5ffc0511c..2854204618d98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.util.StringUtils @@ -449,6 +449,10 @@ class SessionCatalog( name.database.isEmpty && tempTables.contains(formatTableName(name.table)) } + def listTemporaryTableOutput(name: String): Seq[Attribute] = { + tempTables(name).output + } + /** * List all tables in the specified database, including temporary tables. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 44babcc93a1de..8dcb9538b7143 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -138,7 +138,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("table does not exist") override def listColumns(tableName: String): Dataset[Column] = { - listColumns(currentDatabase, tableName) + if (sessionCatalog.isTemporaryTable(TableIdentifier(tableName))) { + val columns = sessionCatalog.listTemporaryTableOutput(tableName).map { c => + new Column( + name = c.name, + description = c.name, + dataType = c.dataType.catalogString, + nullable = c.nullable, + isPartition = false, + isBucket = false + ) + } + CatalogImpl.makeDataset(columns, sparkSession) + } else { + listColumns(currentDatabase, tableName) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index d862e4cfa943a..d75df56dd608a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -234,6 +234,11 @@ class CatalogSuite testListColumns("tab1", dbName = None) } + test("list columns in temporary table") { + createTempTable("temp1") + spark.catalog.listColumns("temp1") + } + test("list columns in database") { createDatabase("db1") createTable("tab1", Some("db1")) From 631ae96dcda2d5fd9aaf9504f286f32fa01db65d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 15:38:57 -0700 Subject: [PATCH 2/8] Improve `getTableMetadata` instead. --- .../sql/catalyst/catalog/SessionCatalog.scala | 44 +++++++++++++------ .../spark/sql/internal/CatalogImpl.scala | 16 +------ 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2854204618d98..59b284bff2b33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.util.StringUtils @@ -251,11 +251,30 @@ class SessionCatalog( * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown. */ def getTableMetadata(name: TableIdentifier): CatalogTable = { - val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + var db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) - requireDbExists(db) - requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.getTable(db, table) + val tid = TableIdentifier(table) + if (db == "" && isTemporaryTable(tid)) { + CatalogTable( + identifier = tid, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = tempTables(table).output.map { c => + CatalogColumn( + name = c.name, + dataType = c.dataType.catalogString, + nullable = c.nullable, + comment = Option(c.name) + ) + }, + properties = Map(), + viewText = None) + } else { + db = if (db == "") currentDb else db + requireDbExists(db) + requireTableExists(TableIdentifier(table, Some(db))) + externalCatalog.getTable(db, table) + } } /** @@ -430,12 +449,13 @@ class SessionCatalog( * contain the table. */ def tableExists(name: TableIdentifier): Boolean = synchronized { - val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.contains(table)) { - externalCatalog.tableExists(db, table) + if (tempTables.contains(table)) { + true } else { - true // it's a temporary table + var db = formatDatabaseName(name.database.getOrElse(currentDb)) + db = if (db == "") currentDb else db + externalCatalog.tableExists(db, table) } } @@ -449,10 +469,6 @@ class SessionCatalog( name.database.isEmpty && tempTables.contains(formatTableName(name.table)) } - def listTemporaryTableOutput(name: String): Seq[Attribute] = { - tempTables(name).output - } - /** * List all tables in the specified database, including temporary tables. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 8dcb9538b7143..600782c8153ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -138,21 +138,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("table does not exist") override def listColumns(tableName: String): Dataset[Column] = { - if (sessionCatalog.isTemporaryTable(TableIdentifier(tableName))) { - val columns = sessionCatalog.listTemporaryTableOutput(tableName).map { c => - new Column( - name = c.name, - description = c.name, - dataType = c.dataType.catalogString, - nullable = c.nullable, - isPartition = false, - isBucket = false - ) - } - CatalogImpl.makeDataset(columns, sparkSession) - } else { - listColumns(currentDatabase, tableName) - } + listColumns("", tableName) } /** From 613bd15b5ef4ae2ae48792341618fad28699141f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 18:16:41 -0700 Subject: [PATCH 3/8] Fix check condition --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 59b284bff2b33..cea6d97d40d6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -450,7 +450,7 @@ class SessionCatalog( */ def tableExists(name: TableIdentifier): Boolean = synchronized { val table = formatTableName(name.table) - if (tempTables.contains(table)) { + if (name.database.getOrElse("").length == 0 && tempTables.contains(table)) { true } else { var db = formatDatabaseName(name.database.getOrElse(currentDb)) From ed5301d741ae5010016cd06f6e0c681320c632dc Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 10 Jul 2016 18:55:28 -0700 Subject: [PATCH 4/8] Make private listColumns(TableIdentifier). --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 11 +++++------ .../org/apache/spark/sql/internal/CatalogImpl.scala | 8 ++++++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index cea6d97d40d6d..32e84dfc5c313 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -251,10 +251,10 @@ class SessionCatalog( * If the specified table is not found in the database then a [[NoSuchTableException]] is thrown. */ def getTableMetadata(name: TableIdentifier): CatalogTable = { - var db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) val tid = TableIdentifier(table) - if (db == "" && isTemporaryTable(tid)) { + if (name.database.isEmpty && isTemporaryTable(tid)) { CatalogTable( identifier = tid, tableType = CatalogTableType.VIEW, @@ -270,7 +270,6 @@ class SessionCatalog( properties = Map(), viewText = None) } else { - db = if (db == "") currentDb else db requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) externalCatalog.getTable(db, table) @@ -449,12 +448,12 @@ class SessionCatalog( * contain the table. */ def tableExists(name: TableIdentifier): Boolean = synchronized { + val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) - if (name.database.getOrElse("").length == 0 && tempTables.contains(table)) { + if (name.database.isEmpty && tempTables.contains(table)) { + // This is a temporary table true } else { - var db = formatDatabaseName(name.database.getOrElse(currentDb)) - db = if (db == "") currentDb else db externalCatalog.tableExists(db, table) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 600782c8153ff..a6ae6fe2aad2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -138,7 +138,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("table does not exist") override def listColumns(tableName: String): Dataset[Column] = { - listColumns("", tableName) + listColumns(TableIdentifier(tableName, None)) } /** @@ -147,7 +147,11 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { @throws[AnalysisException]("database or table does not exist") override def listColumns(dbName: String, tableName: String): Dataset[Column] = { requireTableExists(dbName, tableName) - val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, Some(dbName))) + listColumns(TableIdentifier(tableName, Some(dbName))) + } + + private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = { + val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier) val partitionColumnNames = tableMetadata.partitionColumnNames.toSet val bucketColumnNames = tableMetadata.bucketColumnNames.toSet val columns = tableMetadata.schema.map { c => From ab43d47788d181f82c7db72327e251258d86c476 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 10 Jul 2016 23:05:42 -0700 Subject: [PATCH 5/8] Add new testcases. --- .../catalog/SessionCatalogSuite.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 05eb302c3c03a..adce5df81cb7f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -432,6 +432,39 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.tableExists(TableIdentifier("tbl3"))) } + test("tableExists on temporary views") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempTable = Range(1, 10, 2, 10) + assert(!catalog.tableExists(TableIdentifier("view1"))) + assert(!catalog.tableExists(TableIdentifier("view1", Some("default")))) + catalog.createTempView("view1", tempTable, overrideIfExists = false) + assert(catalog.tableExists(TableIdentifier("view1"))) + assert(!catalog.tableExists(TableIdentifier("view1", Some("default")))) + } + + test("getTableMetadata on temporary views") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempTable = Range(1, 10, 2, 10) + val m = intercept[AnalysisException] { + catalog.getTableMetadata(TableIdentifier("view1")) + }.getMessage + assert(m.contains("Table or view 'view1' not found in database 'default'")) + + val m2 = intercept[AnalysisException] { + catalog.getTableMetadata(TableIdentifier("view1", Some("default"))) + }.getMessage + assert(m2.contains("Table or view 'view1' not found in database 'default'")) + + catalog.createTempView("view1", tempTable, overrideIfExists = false) + assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1") + assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id") + + val m3 = intercept[AnalysisException] { + catalog.getTableMetadata(TableIdentifier("view1", Some("default"))) + }.getMessage + assert(m3.contains("Table or view 'view1' not found in database 'default'")) + } + test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10) From e267713f1baec34d8869a3bcfd11ade66b2037ec Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 11 Jul 2016 11:23:30 -0700 Subject: [PATCH 6/8] Use isTemporaryTable if possible. --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 32e84dfc5c313..9f930847664cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -450,8 +450,7 @@ class SessionCatalog( def tableExists(name: TableIdentifier): Boolean = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) - if (name.database.isEmpty && tempTables.contains(table)) { - // This is a temporary table + if (isTemporaryTable(name)) { true } else { externalCatalog.tableExists(db, table) From 650ead2c397ddec72199d84bd8f8b2906dc0d3a7 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 11 Jul 2016 12:10:03 -0700 Subject: [PATCH 7/8] Fix one more. --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 9f930847664cf..c0ebb2b1fa1ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -254,7 +254,7 @@ class SessionCatalog( val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) val tid = TableIdentifier(table) - if (name.database.isEmpty && isTemporaryTable(tid)) { + if (isTemporaryTable(name)) { CatalogTable( identifier = tid, tableType = CatalogTableType.VIEW, From d1fa9ecb9959a968de9a1381ab669f5b4f177857 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 11 Jul 2016 12:26:59 -0700 Subject: [PATCH 8/8] Update docs. --- .../src/main/scala/org/apache/spark/sql/catalog/Catalog.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 91ed9b3258a12..1aed245fdd332 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -85,7 +85,8 @@ abstract class Catalog { def listFunctions(dbName: String): Dataset[Function] /** - * Returns a list of columns for the given table in the current database. + * Returns a list of columns for the given table in the current database or + * the given temporary table. * * @since 2.0.0 */