From b7c09946487156e86b53912fb0360cfb295b6106 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 9 Aug 2016 16:05:10 -0700 Subject: [PATCH 1/3] address comments --- .../spark/sql/execution/command/ddl.scala | 18 ++++++++------ .../sql/execution/datasources/rules.scala | 24 ++++++------------- .../sql/execution/command/DDLSuite.scala | 20 ++++++++++------ 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f0e49e65c459d..407bb073b059e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -526,24 +526,28 @@ object DDLUtils { require(isDatasourceTable(metadata)) val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties - props.get(DATASOURCE_SCHEMA).map { schema => + val schema = props.get(DATASOURCE_SCHEMA) + if (schema.isDefined) { // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - DataType.fromJson(schema).asInstanceOf[StructType] - } getOrElse { - props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => - val parts = (0 until numParts.toInt).map { index => + DataType.fromJson(schema.get).asInstanceOf[StructType] + } else { + val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS) + if (numSchemaParts.isDefined) { + val parts = (0 until numSchemaParts.get.toInt).map { index => val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull if (part == null) { throw new AnalysisException(msgSchemaCorrupted + - s" (missing part $index of the schema, $numParts parts are expected).") + s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).") } part } // Stick all parts back to a single schema string. DataType.fromJson(parts.mkString).asInstanceOf[StructType] - } getOrElse(throw new AnalysisException(msgSchemaCorrupted)) + } else { + throw new AnalysisException(msgSchemaCorrupted) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index d5b92323d4418..2c5043e74b830 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -72,29 +72,19 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // When we CREATE TABLE without specifying the table schema, we should fail the query if - // bucketing information is specified, as we can't infer bucketing from data files currently, - // and we should ignore the partition columns if it's specified, as we will infer it later, at - // runtime. + // bucketing information is specified, as we can't infer bucketing from data files currently. + // Since the runtime inferred partition columns could be different from what user specified, + // we fail the query if the partitioning information is specified. case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => if (tableDesc.bucketSpec.isDefined) { failAnalysis("Cannot specify bucketing information if the table schema is not specified " + "when creating and will be inferred at runtime") } - - val partitionColumnNames = tableDesc.partitionColumnNames - if (partitionColumnNames.nonEmpty) { - // The table does not have a specified schema, which means that the schema will be inferred - // at runtime. So, we are not expecting partition columns and we will discover partitions - // at runtime. However, if there are specified partition columns, we simply ignore them and - // provide a warning message. - logWarning( - s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + - s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + - "be inferred.") - c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) - } else { - c + if (tableDesc.partitionColumnNames.nonEmpty) { + failAnalysis("Cannot specify partition information if the table schema is not specified " + + "when creating and will be inferred at runtime") } + c // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity // config, and do various checks: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ca9b210125b58..2cabd4de17f08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -263,7 +263,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val partitionClause = userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("") val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("") - sql( + val sqlCreateTable = s""" |CREATE TABLE $tabName $schemaClause |USING parquet @@ -271,13 +271,19 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | path '$path' |) |$partitionClause - """.stripMargin) - val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + """.stripMargin + if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) { + val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage + assert(e.contains("Cannot specify partition columns")) + } else { + sql(sqlCreateTable) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) - assert(expectedSchema == - DDLUtils.getSchemaFromTableProperties(tableMetadata)) - assert(expectedPartitionCols == - DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)) + assert(expectedSchema == + DDLUtils.getSchemaFromTableProperties(tableMetadata)) + assert(expectedPartitionCols == + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)) + } } } From 35cbf9d93cd70dc74a52078fc7cd5661767589b7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 9 Aug 2016 19:12:06 -0700 Subject: [PATCH 2/3] fixed the test case failure. --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 2cabd4de17f08..b0779e042da1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -274,7 +274,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { """.stripMargin if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) { val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage - assert(e.contains("Cannot specify partition columns")) + assert(e.contains("Cannot specify partition information")) } else { sql(sqlCreateTable) val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) From d3a79c847b24b5eb3dce0818099d99dd25869b87 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 22 Aug 2016 16:27:42 -0700 Subject: [PATCH 3/3] address comments. --- .../org/apache/spark/sql/execution/datasources/rules.scala | 5 +++-- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index d11ee102f675b..f14c63c19f905 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -81,8 +81,9 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { "when creating and will be inferred at runtime") } if (tableDesc.partitionColumnNames.nonEmpty) { - failAnalysis("Cannot specify partition information if the table schema is not specified " + - "when creating and will be inferred at runtime") + failAnalysis("It is not allowed to specify partition columns when the table schema is " + + "not defined. When the table schema is not provided, schema and partition columns " + + "will be inferred.") } c diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 035f5786d614b..b343454b12d86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -276,7 +276,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { """.stripMargin if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) { val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage - assert(e.contains("Cannot specify partition information")) + assert(e.contains( + "not allowed to specify partition columns when the table schema is not defined")) } else { sql(sqlCreateTable) val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))