From 067d12b4427023c909d75bc2f84195449bf9bb74 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Apr 2016 12:02:59 -0700 Subject: [PATCH 1/4] SPARK-14459: Detect relation partitioning and adjust the logical plan to match. This detects a relation's partitioning and adds checks to the analyzer. If an InsertIntoTable node has no partitioning, it is replaced by the relation's partition scheme and input columns are correctly adjusted, placing the partition columns at the end in partition order. If an InsertIntoTable node has partitioning, it is checked against the table's reported partitions. These changes required adding a PartitionedRelation trait to the catalog interface because Hive's MetastoreRelation doesn't extend CatalogRelation. This commit also includes a fix to InsertIntoTable's resolved logic, which now detects that all expected columns are present, including dynamic partition columns. Previously, the number of expected columns was not checked and resolved was true if there were missing columns. --- .../sql/catalyst/analysis/Analyzer.scala | 40 +++++++++- .../plans/logical/basicLogicalOperators.scala | 10 ++- .../hive/execution/InsertIntoHiveTable.scala | 10 ++- .../sql/hive/InsertIntoHiveTableSuite.scala | 76 +++++++++++++++++++ 4 files changed, 131 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f6a65f7e6c09d..90b437903a60f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -417,8 +417,42 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => - i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => + val table = lookupTableFromCatalog(u) + // adding the table's partitions or validate the query's partition info + table match { + case relation: CatalogRelation if relation.catalogTable.partitionColumns.nonEmpty => + val tablePartitionNames = relation.catalogTable.partitionColumns.map(_.name) + if (parts.keys.nonEmpty) { + // the query's partitioning must match the table's partitioning + // this is set for queries like: insert into ... partition (one = "a", two = ) + if (tablePartitionNames.size != parts.keySet.size) { + throw new AnalysisException( + s"""Requested partitioning does not match the ${u.tableIdentifier} table: + |Requested partitions: ${parts.keys.mkString(",")} + |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) + } + // assumes partition columns are correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table)) + } else { + // Set up the table's partition scheme with all dynamic partitions by moving partition + // columns to the end of the column list, in partition order. + val (inputPartCols, columns) = child.output.partition { attr => + tablePartitionNames.contains(attr.name) + } + // All partition columns are dynamic because this InsertIntoTable had no partitioning + val partColumns = tablePartitionNames.map { name => + inputPartCols.find(_.name == name).getOrElse( + throw new AnalysisException(s"Cannot find partition column $name")) + } + i.copy( + table = EliminateSubqueryAliases(table), + partition = tablePartitionNames.map(_ -> None).toMap, + child = Project(columns ++ partColumns, child)) + } + case _ => + i.copy(table = EliminateSubqueryAliases(table)) + } case u: UnresolvedRelation => val table = u.tableIdentifier if (table.database.isDefined && conf.runSQLonFile && diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b358e210daf75..c4f63008986cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -347,8 +347,16 @@ case class InsertIntoTable( override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = Seq.empty + private[spark] lazy val expectedColumns = { + val numDynamicPartitions = partition.values.count(_.isEmpty) + val (partitionColumns, dataColumns) = table.output + .partition(a => partition.keySet.contains(a.name)) + dataColumns ++ partitionColumns.takeRight(numDynamicPartitions) + } + assert(overwrite || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall { + override lazy val resolved: Boolean = childrenResolved && + child.output.size == expectedColumns.size && child.output.zip(expectedColumns).forall { case (childAttr, tableAttr) => DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 73ccec2ee0ba5..3805674d39589 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -168,7 +168,15 @@ case class InsertIntoHiveTable( // All partition column names in the format of "//..." val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull + val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) + + // By this time, the partition map must match the table's partition columns + if (partitionColumnNames.toSet != partition.keySet) { + throw new SparkException( + s"""Requested partitioning does not match the ${table.tableName} table: + |Requested partitions: ${partition.keys.mkString(",")} + |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin) + } // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 46de4921f6503..2424488360ff5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -22,7 +22,9 @@ import java.io.File import org.apache.hadoop.hive.conf.HiveConf import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.types._ @@ -212,4 +214,78 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE hiveTableWithStructValue") } + + test("Reject partitioning that does not match table") { + sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) + .toDF("id", "data", "part") + + intercept[AnalysisException] { + // cannot partition by 2 fields when there is only one in the table definition + data.write.partitionBy("part", "data").insertInto("partitioned") + } + } + + test("Test partition mode = strict") { + sqlContext.conf.unsetConf("hive.exec.dynamic.partition.mode") + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) + .toDF("id", "data", "part") + + intercept[SparkException] { + data.write.insertInto("partitioned") + } + } + + test("Detect table partitioning") { + sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + + sql("CREATE TABLE source (id bigint, data string, part string)") + val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF() + + data.write.insertInto("source") + checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) + + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + // this will pick up the output partitioning from the table definition + sqlContext.table("source").write.insertInto("partitioned") + + checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq) + + sqlContext.conf.unsetConf("hive.exec.dynamic.partition.mode") + } + + test("Detect table partitioning with correct partition order") { + sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + + sql("CREATE TABLE source (id bigint, part2 string, part1 string, data string)") + val data = (1 to 10).map(i => (i, if ((i % 2) == 0) "even" else "odd", "p", s"data-$i")) + .toDF("id", "part2", "part1", "data") + + data.write.insertInto("source") + checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) + + // the original data with part1 and part2 at the end + val expected = data.select("id", "data", "part1", "part2") + + sql( + """CREATE TABLE partitioned (id bigint, data string) + |PARTITIONED BY (part1 string, part2 string)""".stripMargin) + sqlContext.table("source").write.insertInto("partitioned") + + checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq) + + sqlContext.conf.unsetConf("hive.exec.dynamic.partition.mode") + } + + test("InsertIntoTable#resolved should include dynamic partitions") { + sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", "data") + + val logical = InsertIntoTable(sqlContext.table("partitioned").logicalPlan, + Map("part" -> None), data.logicalPlan, overwrite = false, ifNotExists = false) + assert(!logical.resolved, "Should not resolve: missing partition data") + } } From eb0b3b59f06c60064c4b068d101ae113ad41d6ce Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 8 Apr 2016 15:23:34 -0700 Subject: [PATCH 2/4] SPARK-14459: Update partition spec validation test. This test expected to fail the strict partition check, but with support for table partitioning in the analyzer the problem is caught sooner and has a better error message. The message now complains that the partitioning doesn't match rather than strict mode, which wouldn't help. --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 3bf0e84267419..bbb775ef770af 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -978,7 +978,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SET hive.exec.dynamic.partition.mode=strict") // Should throw when using strict dynamic partition mode without any static partition - intercept[SparkException] { + intercept[AnalysisException] { sql( """INSERT INTO TABLE dp_test PARTITION(dp) |SELECT key, value, key % 5 FROM src From 0ba41c75831ea4a859e5c668221d724f0846f2e1 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 11 Apr 2016 17:57:20 -0700 Subject: [PATCH 3/4] SPARK-14459: Fix OneRowTable error message. OneRowTable doesn't expose its output columns because it is a singleton. The more strict checks in InsertIntoTable's resolution is causing this to fail. This commit fixes the problem by catching the case where a table doesn't define its outputs and considers the table resolved. --- .../plans/logical/basicLogicalOperators.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c4f63008986cb..7fd2015f849b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -348,17 +348,22 @@ case class InsertIntoTable( override def output: Seq[Attribute] = Seq.empty private[spark] lazy val expectedColumns = { - val numDynamicPartitions = partition.values.count(_.isEmpty) - val (partitionColumns, dataColumns) = table.output - .partition(a => partition.keySet.contains(a.name)) - dataColumns ++ partitionColumns.takeRight(numDynamicPartitions) + if (table.output.isEmpty) { + None + } else { + val numDynamicPartitions = partition.values.count(_.isEmpty) + val (partitionColumns, dataColumns) = table.output + .partition(a => partition.keySet.contains(a.name)) + Some(dataColumns ++ partitionColumns.takeRight(numDynamicPartitions)) + } } assert(overwrite || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && - child.output.size == expectedColumns.size && child.output.zip(expectedColumns).forall { - case (childAttr, tableAttr) => - DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) + override lazy val resolved: Boolean = childrenResolved && expectedColumns.forall { expected => + child.output.size == expected.size && child.output.zip(expected).forall { + case (childAttr, tableAttr) => + DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) + } } } From 2130db80e6717060aad7f37f445cad6ec861252d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 22 Apr 2016 10:14:48 -0700 Subject: [PATCH 4/4] SPARK-14459: Changes for review feedback. --- .../sql/catalyst/analysis/Analyzer.scala | 3 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 103 +++++++++--------- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 90b437903a60f..6a51196921b39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -426,13 +426,14 @@ class Analyzer( if (parts.keys.nonEmpty) { // the query's partitioning must match the table's partitioning // this is set for queries like: insert into ... partition (one = "a", two = ) + // TODO: add better checking to pre-inserts to avoid needing this here if (tablePartitionNames.size != parts.keySet.size) { throw new AnalysisException( s"""Requested partitioning does not match the ${u.tableIdentifier} table: |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // assumes partition columns are correctly placed at the end of the child's output + // Assume partition columns are correctly placed at the end of the child's output i.copy(table = EliminateSubqueryAliases(table)) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 2424488360ff5..d2f70b14accf9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -25,8 +25,8 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException import org.apache.spark.sql.{QueryTest, _} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable -import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -34,11 +34,11 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) -class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter { +class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter + with SQLTestUtils { import hiveContext.implicits._ - import hiveContext.sql - val testData = hiveContext.sparkContext.parallelize( + override lazy val testData = hiveContext.sparkContext.parallelize( (1 to 100).map(i => TestData(i, i.toString))).toDF() before { @@ -216,76 +216,75 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } test("Reject partitioning that does not match table") { - sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) - .toDF("id", "data", "part") - - intercept[AnalysisException] { - // cannot partition by 2 fields when there is only one in the table definition - data.write.partitionBy("part", "data").insertInto("partitioned") + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) + .toDF("id", "data", "part") + + intercept[AnalysisException] { + // cannot partition by 2 fields when there is only one in the table definition + data.write.partitionBy("part", "data").insertInto("partitioned") + } } } test("Test partition mode = strict") { - sqlContext.conf.unsetConf("hive.exec.dynamic.partition.mode") - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) - .toDF("id", "data", "part") + withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) { + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) + .toDF("id", "data", "part") - intercept[SparkException] { - data.write.insertInto("partitioned") + intercept[SparkException] { + data.write.insertInto("partitioned") + } } } test("Detect table partitioning") { - sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") - - sql("CREATE TABLE source (id bigint, data string, part string)") - val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF() - - data.write.insertInto("source") - checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + sql("CREATE TABLE source (id bigint, data string, part string)") + val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF() - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - // this will pick up the output partitioning from the table definition - sqlContext.table("source").write.insertInto("partitioned") + data.write.insertInto("source") + checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) - checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq) + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + // this will pick up the output partitioning from the table definition + sqlContext.table("source").write.insertInto("partitioned") - sqlContext.conf.unsetConf("hive.exec.dynamic.partition.mode") + checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq) + } } test("Detect table partitioning with correct partition order") { - sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") - - sql("CREATE TABLE source (id bigint, part2 string, part1 string, data string)") - val data = (1 to 10).map(i => (i, if ((i % 2) == 0) "even" else "odd", "p", s"data-$i")) - .toDF("id", "part2", "part1", "data") - - data.write.insertInto("source") - checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + sql("CREATE TABLE source (id bigint, part2 string, part1 string, data string)") + val data = (1 to 10).map(i => (i, if ((i % 2) == 0) "even" else "odd", "p", s"data-$i")) + .toDF("id", "part2", "part1", "data") - // the original data with part1 and part2 at the end - val expected = data.select("id", "data", "part1", "part2") + data.write.insertInto("source") + checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) - sql( - """CREATE TABLE partitioned (id bigint, data string) - |PARTITIONED BY (part1 string, part2 string)""".stripMargin) - sqlContext.table("source").write.insertInto("partitioned") + // the original data with part1 and part2 at the end + val expected = data.select("id", "data", "part1", "part2") - checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq) + sql( + """CREATE TABLE partitioned (id bigint, data string) + |PARTITIONED BY (part1 string, part2 string)""".stripMargin) + sqlContext.table("source").write.insertInto("partitioned") - sqlContext.conf.unsetConf("hive.exec.dynamic.partition.mode") + checkAnswer(sql("SELECT * FROM partitioned"), expected.collect().toSeq) + } } test("InsertIntoTable#resolved should include dynamic partitions") { - sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", "data") + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") + val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", "data") - val logical = InsertIntoTable(sqlContext.table("partitioned").logicalPlan, - Map("part" -> None), data.logicalPlan, overwrite = false, ifNotExists = false) - assert(!logical.resolved, "Should not resolve: missing partition data") + val logical = InsertIntoTable(sqlContext.table("partitioned").logicalPlan, + Map("part" -> None), data.logicalPlan, overwrite = false, ifNotExists = false) + assert(!logical.resolved, "Should not resolve: missing partition data") + } } }