Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ case class CreateTableAsSelectExec(
}

Utils.tryWithSafeFinallyAndFailureCallbacks({
val schema = query.schema.asNullable
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to update the schema nullability of the corresponding logical plans, CreateTableAsSelect and ReplaceTable, in the analyzer phase? Any reason to directly update the nullability of physical plans?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's too much work if we need to transform the logical plan and add an extra Project to change the nullability.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be incorrect to change the logical plan. The behavior of CTAS should be that tables are created with nullable types. The query used by CTAS should not be changed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks. Looks ok to me.

catalog.createTable(
ident, query.schema, partitioning.toArray, properties.asJava) match {
ident, schema, partitioning.toArray, properties.asJava) match {
case table: SupportsWrite =>
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withInputDataSchema(schema)
.withQueryId(UUID.randomUUID().toString)

writeBuilder match {
Expand Down Expand Up @@ -132,7 +133,7 @@ case class AtomicCreateTableAsSelectExec(
throw new TableAlreadyExistsException(ident)
}
val stagedTable = catalog.stageCreate(
ident, query.schema, partitioning.toArray, properties.asJava)
ident, query.schema.asNullable, partitioning.toArray, properties.asJava)
writeToStagedTable(stagedTable, writeOptions, ident)
}
}
Expand Down Expand Up @@ -173,13 +174,14 @@ case class ReplaceTableAsSelectExec(
} else if (!orCreate) {
throw new CannotReplaceMissingTableException(ident)
}
val schema = query.schema.asNullable
val createdTable = catalog.createTable(
ident, query.schema, partitioning.toArray, properties.asJava)
ident, schema, partitioning.toArray, properties.asJava)
Utils.tryWithSafeFinallyAndFailureCallbacks({
createdTable match {
case table: SupportsWrite =>
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withInputDataSchema(schema)
.withQueryId(UUID.randomUUID().toString)

writeBuilder match {
Expand Down Expand Up @@ -221,13 +223,14 @@ case class AtomicReplaceTableAsSelectExec(
orCreate: Boolean) extends AtomicTableWriteExec {

override protected def doExecute(): RDD[InternalRow] = {
val schema = query.schema.asNullable
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
ident, schema, partitioning.toArray, properties.asJava)
} else if (catalog.tableExists(ident)) {
try {
catalog.stageReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
ident, schema, partitioning.toArray, properties.asJava)
} catch {
case e: NoSuchTableException =>
throw new CannotReplaceMissingTableException(ident, Some(e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> "foo").asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("id", LongType)
.add("data", StringType))

val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
Expand All @@ -258,8 +258,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(replacedTable.name == identifier)
assert(replacedTable.partitioning.isEmpty)
assert(replacedTable.properties == Map("provider" -> "foo").asJava)
assert(replacedTable.schema == new StructType()
.add("id", LongType, nullable = false))
assert(replacedTable.schema == new StructType().add("id", LongType))

val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
checkAnswer(
Expand Down Expand Up @@ -391,7 +390,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> orc2).asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("id", LongType)
.add("data", StringType))

val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
Expand All @@ -408,7 +407,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> "foo").asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("id", LongType)
.add("data", StringType))

val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
Expand All @@ -428,7 +427,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(table2.partitioning.isEmpty)
assert(table2.properties == Map("provider" -> "foo").asJava)
assert(table2.schema == new StructType()
.add("id", LongType, nullable = false)
.add("id", LongType)
.add("data", StringType))

val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
Expand All @@ -446,7 +445,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> "foo").asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("id", LongType)
.add("data", StringType))

val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
Expand Down Expand Up @@ -477,7 +476,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> "foo").asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("id", LongType)
.add("data", StringType))

val rdd = sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
Expand All @@ -500,6 +499,32 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSparkSession with Before
assert(t.isInstanceOf[UnresolvedTable], "V1 table wasn't returned as an unresolved table")
}

test("CreateTableAsSelect: nullable schema") {
val basicCatalog = catalog("testcat").asTableCatalog
val atomicCatalog = catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"

Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT 1 i")

val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor and non-blocking... "table_name" repeated many times and is it better to make it a test class variable and each test case referencing it?

sorry maybe I'm being too nitpick lol

PR looks good to me :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to me if you don't have to jump around too much when reading the code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in your PR https://github.com/apache/spark/pull/25507/files DataSourceV2DataFrameSessionCatalogSuite.scala, there is a class variable v2Format being referenced by all test cases

protected val v2Format: String = classOf[InMemoryTableProvider].getName

so I thought it's a style convention...


assert(table.name == identifier)
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> "foo").asJava)
assert(table.schema == new StructType().add("i", "int"))

val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Row(1))

sql(s"INSERT INTO $identifier SELECT CAST(null AS INT)")
val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), Seq(Row(1), Row(null)))
}
}

test("DropTable: basic") {
val tableName = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
Expand Down